#!/usr/bin/perl
# $Id$
#

use OAR::IO;
use Sys::Hostname;
use OAR::Conf qw(init_conf dump_conf get_conf is_conf);
use OAR::Modules::Judas qw(oar_debug oar_warn oar_error send_log_by_email set_current_log_category);
use OAR::PingChecker;
use OAR::Tools;
use Data::Dumper;
use IPC::Open2;

my $OAR_Tools_module_filepath="/usr/share/perl5/OAR/Tools.pm";

# suitable Data::Dumper configuration for serialization
$Data::Dumper::Purity = 1;
$Data::Dumper::Terse = 1;
$Data::Dumper::Indent = 0;
$Data::Dumper::Deepcopy = 1;

# Log category
set_current_log_category('main');

my $Job_id= shift;
my $Oarexec_reattach_exit_value = shift;
my $Oarexec_reattach_script_exit_value = shift;
my $Oarexec_challenge = shift;

#get server conf
init_conf($ENV{OARCONFFILE});
my $Remote_host = get_conf("SERVER_HOSTNAME");
my $Remote_port = get_conf("SERVER_PORT");

my $Node_file_db_field = get_conf("NODE_FILE_DB_FIELD");
$Node_file_db_field = OAR::Tools::get_default_node_file_db_field() if (!defined($Node_file_db_field));

my $Node_file_db_field_distinct_values = get_conf("NODE_FILE_DB_FIELD_DISTINCT_VALUES");
$Node_file_db_field_distinct_values = OAR::Tools::get_default_node_file_db_field_distinct_values() if (!defined($Node_file_db_field_distinct_values));

my $Openssh_cmd = get_conf("OPENSSH_CMD");
$Openssh_cmd = OAR::Tools::get_default_openssh_cmd() if (!defined($Openssh_cmd));

my $Almighty_hostname = $Remote_host;
if (($Almighty_hostname =~ /^\s*localhost.*$/) or ($Almighty_hostname =~ /^\s*127.*$/)){
    $Almighty_hostname = hostname();
}

my $Deploy_hostname = get_conf("DEPLOY_HOSTNAME");
if (!defined($Deploy_hostname)){
    $Deploy_hostname = $Remote_host;
}
my $Cosystem_hostname = get_conf("COSYSTEM_HOSTNAME");
if (!defined($Cosystem_hostname)){
    $Cosystem_hostname = $Remote_host;
}
my $Debug_mode = get_conf("OAREXEC_DEBUG_MODE");
if (!defined($Debug_mode)){
    $Debug_mode = 0;
}

my $Detach_oarexec = get_conf("DETACH_JOB_FROM_SERVER");
if (!defined($Detach_oarexec)){
    $Detach_oarexec = 0;
}

if (is_conf("OAR_RUNTIME_DIRECTORY")){
    OAR::Tools::set_default_oarexec_directory(get_conf("OAR_RUNTIME_DIRECTORY"));
}

my $Server_prologue = get_conf("SERVER_PROLOGUE_EXEC_FILE");
my $Server_epilogue = get_conf("SERVER_EPILOGUE_EXEC_FILE");

my $Cpuset_field = get_conf("JOB_RESOURCE_MANAGER_PROPERTY_DB_FIELD");
my $Cpuset_file = get_conf("JOB_RESOURCE_MANAGER_FILE");
$Cpuset_file = OAR::Tools::get_default_cpuset_file() if (!defined($Cpuset_file));
$Cpuset_file = "$ENV{OARDIR}/$Cpuset_file" if ($Cpuset_file !~ /^\//);

if (is_conf("OAR_SSH_CONNECTION_TIMEOUT")){
    OAR::Tools::set_ssh_timeout(get_conf("OAR_SSH_CONNECTION_TIMEOUT"));
}

my $Job_user;

my $base;

$base = OAR::IO::connect();
my $Cpuset_path = get_conf("CPUSET_PATH");
my $Cpuset_name = OAR::IO::get_job_cpuset_name($base, $Job_id) if (defined($Cpuset_field));
my $Cpuset_full_path;
if (defined($Cpuset_path) and defined($Cpuset_field)){
    $Cpuset_full_path = $Cpuset_path.'/'.$Cpuset_name;
}

my ($job_challenge,$ssh_private_key,$ssh_public_key) = OAR::IO::get_job_challenge($base,$Job_id);
my @hosts = OAR::IO::get_job_current_hostnames($base,$Job_id);
my $job = OAR::IO::get_job($base,$Job_id);
my @resources;
my $moldable;

sub call_server_prologue {
    # PROLOGUE EXECUTED ON OAR SERVER #
    # Script is executing with job id in arguments
    if (defined($Server_prologue)){
        my $cmd = "$Server_prologue $Job_id";
        my $pid;
        my $exit_value;
        my $signal_num;
        my $dumped_core;
        my $timeout = OAR::Tools::get_default_server_prologue_epilogue_timeout();
        if (is_conf("SERVER_PROLOGUE_EPILOGUE_TIMEOUT")){
            $timeout = get_conf("SERVER_PROLOGUE_EPILOGUE_TIMEOUT"); 
        }
        eval{
            $SIG{PIPE}  = 'IGNORE';
            $SIG{ALRM} = sub { die "alarm\n" };
            alarm($timeout);
            $pid = fork();
            if ($pid == 0){
                undef($base);
                exec($cmd);
                warn("[ERROR] Cannot find $cmd\n");
                exit(-1);
            }
            my $wait_res = 0;
            # Avoid to be disrupted by a signal
            while ($wait_res != $pid){
                $wait_res = waitpid($pid,0);
            }
            alarm(0);
            $exit_value  = $? >> 8;
            $signal_num  = $? & 127;
            $dumped_core = $? & 128;
        };
        if ($@){
            if ($@ eq "alarm\n"){
                if (defined($pid)){
                    my ($children,$cmd_name) = OAR::Tools::get_one_process_children($pid);
                    kill(9,@{$children});
                }
                my $str = "[bipbip $Job_id] Server prologue timeouted (cmd : $cmd)";
                oar_error("$str\n");
                OAR::IO::add_new_event($base,"SERVER_PROLOGUE_TIMEOUT",$Job_id,"$str");
                OAR::IO::disconnect($base);
                OAR::Tools::notify_tcp_socket($Remote_host,$Remote_port,"ChState");
                if (($job->{job_type} eq "INTERACTIVE") and ($job->{reservation} eq "None")){
                    my ($addr,$port) = split(/:/,$job->{info_type});
                    OAR::Tools::notify_tcp_socket($addr,$port,"ERROR: SERVER PROLOGUE timeouted");
                }
                exit(2);
            }
        }elsif ($exit_value != 0){
            my $str = "[bipbip $Job_id] Server prologue exit code $exit_value (!=0) (cmd : $cmd)";
            oar_error("$str\n");
            OAR::IO::add_new_event($base,"SERVER_PROLOGUE_EXIT_CODE_ERROR",$Job_id,"$str");
            OAR::IO::disconnect($base);
            OAR::Tools::notify_tcp_socket($Remote_host,$Remote_port,"ChState");
            if (($job->{job_type} eq "INTERACTIVE") and ($job->{reservation} eq "None")){
                my ($addr,$port) = split(/:/,$job->{info_type});
                OAR::Tools::notify_tcp_socket($addr,$port,"ERROR: SERVER PROLOGUE returned a bad value");
            }
            exit(2);
        }
    }
}

#check if we must treate the end of a oarexec
if ((defined($Oarexec_reattach_exit_value)) and
    (($job->{state} eq "Launching") or
     ($job->{state} eq "Running") or
     ($job->{state} eq "Suspended") or
     ($job->{state} eq "Resuming")
    )
   ){
    oar_debug("[bipbip $Job_id] OAREXEC end: $Oarexec_reattach_exit_value $Oarexec_reattach_script_exit_value\n");
    if ($Oarexec_reattach_exit_value =~ m/\d+/m){
        if ($Oarexec_challenge eq $job_challenge){
            OAR::IO::check_end_of_job($base, $Job_id, $Oarexec_reattach_script_exit_value, $Oarexec_reattach_exit_value, \@hosts, $Remote_host, $Remote_port,$job->{job_user},$job->{launching_directory},$Server_epilogue);
            OAR::IO::disconnect($base);
            exit(0);
        }else{
            oar_error("[bipbip $Job_id] Bad challenge from oarexec, perhaps a pirate attack??? ($Oarexec_challenge).\n");
            OAR::IO::add_new_event($base, "BIPBIP_CHALLENGE", $Job_id, "Bad challenge from oarexec, perhaps a pirate attack??? ($Oarexec_challenge)");
            OAR::IO::disconnect($base);
            exit(2);
        }
    }else{
        oar_error("[bipbip $Job_id] Bad argument for bipbip : $Oarexec_reattach_exit_value\n");
        OAR::IO::disconnect($base);
        exit(2);
    }
}else{
    @resources = OAR::IO::get_current_assigned_job_resources($base,$job->{assigned_moldable_job});
    $moldable = OAR::IO::get_current_moldable_job($base,$job->{assigned_moldable_job});
    $job = OAR::IO::get_job($base,$Job_id); # In case of the job state has changed since the last call :-(
    if ($job->{state} eq "toLaunch"){
        # Tell that the launching process is initiated
        OAR::IO::set_job_state($base,$Job_id,"Launching");
        $job->{state} = "Launching";
    }else{
        oar_warn("[bipbip $Job_id] Job already treated or deleted in the meantime\n");
        OAR::IO::disconnect($base);
        exit(1);
    }
}

# NOOP jobs
my $types = OAR::IO::get_job_types_hash($base,$Job_id);
if (defined($types->{noop})){
    OAR::IO::set_job_state($base,$Job_id,"Running");
    OAR::IO::disconnect($base);
    oar_debug("[bipbip $Job_id] User: $job->{job_user}; Set NOOP job to Running\n");
    call_server_prologue();
    exit(0);
}

# HERE we must launch oarexec on the first node
oar_debug("[bipbip $Job_id] JOB: $Job_id; User: $job->{job_user}; Command: $job->{command} ==> hosts :[@hosts]\n");

if (($job->{job_type} eq "INTERACTIVE") and ($job->{reservation} eq "None")){
    my ($addr,$port) = split(/:/,$job->{info_type});
    OAR::Tools::notify_tcp_socket($addr,$port,"Starting...");
}

$Job_user = $job->{job_user};

if ((!defined($types->{deploy})) and (!(defined($types->{cosystem})) and ($#hosts >= 0))){
    my $event_type;
    my @bad;
    ###############
    # CPUSET PART #
    ###############
    my $cpuset_nodes;
    $cpuset_nodes = OAR::IO::get_cpuset_values_for_a_moldable_job($base,$Cpuset_field,$job->{assigned_moldable_job}) if (defined($Cpuset_field));

    $Job_user = $job->{job_user};
    $ssh_public_key = OAR::Tools::format_ssh_pub_key($ssh_public_key,$Cpuset_full_path,$job->{job_user},$Job_user);
    my $cpuset_data_hash = {
        job_id => $Job_id,
        name => $Cpuset_name,
        nodes => $cpuset_nodes,
        cpuset_path => $Cpuset_path,
        ssh_keys => {
                        public => {
                                    file_name => OAR::Tools::get_default_oar_ssh_authorized_keys_file(),
                                    key => $ssh_public_key
                                  },
                        private => {
                                    file_name => OAR::Tools::get_private_ssh_key_file_name($Cpuset_name),
                                    key => $ssh_private_key
                                   }
                    },
        oar_tmp_directory => OAR::Tools::get_default_oarexec_directory(),
        user => $job->{job_user},
        job_user => $Job_user,
        types => $types,
        resources => \@resources,
        node_file_db_fields => $Node_file_db_field,
        node_file_db_fields_distinct_values => $Node_file_db_field_distinct_values,
        array_id            => $job->{array_id},
        array_index         => $job->{array_index},
        stdout_file         => OAR::Tools::replace_jobid_tag_in_string($job->{stdout_file},$Job_id),
        stderr_file         => OAR::Tools::replace_jobid_tag_in_string($job->{stderr_file},$Job_id),
        launching_directory => $job->{launching_directory},
        job_name            => $job->{job_name},
        walltime_seconds    => $moldable->{moldable_walltime},
        walltime            => OAR::IO::duration_to_sql($moldable->{moldable_walltime}),
        project             => $job->{project},
        log_level => OAR::Modules::Judas::get_log_level()
    };
    if (defined($cpuset_nodes)){
        my $taktuk_cmd = get_conf("TAKTUK_CMD");
        my ($tag,@bad_tmp) = OAR::Tools::manage_remote_commands([keys(%{$cpuset_nodes})],$cpuset_data_hash,$Cpuset_file,"init",$Openssh_cmd,$taktuk_cmd,$base);
        if ($tag == 0){
            my $str = "[bipbip $Job_id] [CPUSET] Bad cpuset file : $Cpuset_file\n";
            OAR::Modules::Judas::oar_error($str);
            OAR::IO::add_new_event($base, "CPUSET_MANAGER_FILE", $Job_id, $str);
        }else{
            push (@bad, @bad_tmp);
            $event_type = "CPUSET_ERROR";
            # Clean already configured cpuset
            my @tmp_array = keys(%{$cpuset_nodes});
            if (($#bad >= 0) and ($#tmp_array > $#bad)){
                # Verify if the job is a reservation
                if ($job->{reservation} ne "None"){
                    # Look at if there is at least one alive node for the reservation
                    my @tmp_hosts;
                    # Keep only alive nodes
                    foreach my $n (@hosts){
                        my $i = 0;
                        while (($i <= $#bad) and ($n ne $bad[$i])){
                            $i++;
                        }
                        push(@tmp_hosts, $n) if ($i > $#bad);
                    }
                    OAR::IO::lock_table($base,["jobs","job_state_logs","resources","assigned_resources","frag_jobs","challenges","moldable_job_descriptions","job_types","job_dependencies","job_resource_groups","job_resource_descriptions","event_logs","event_log_hostnames"]);
                    OAR::IO::set_job_message($base,$Job_id,"One or several nodes are not responding correctly(CPUSET_ERROR)");
                    OAR::IO::add_new_event_with_host($base,"$event_type",$Job_id,"[bipbip] OAR cpuset suspects nodes for the job $Job_id : @bad",\@bad);
                    OAR::IO::archive_some_moldable_job_nodes($base,$job->{assigned_moldable_job},\@bad);
                    OAR::IO::unlock_table($base);
                    OAR::Tools::notify_tcp_socket($Remote_host,$Remote_port,"ChState");
                    @hosts = @tmp_hosts;
                    @bad = ();
                }else{
                    # remove non initialized nodes
                    foreach my $b (@bad){
                        delete($cpuset_nodes->{$b});
                    }
                    $cpuset_data_hash->{nodes}=$cpuset_nodes;
                    my ($tag,@bad_tmp) = OAR::Tools::manage_remote_commands([keys(%{$cpuset_nodes})],$cpuset_data_hash,$Cpuset_file,"clean",$Openssh_cmd,$taktuk_cmd,$base);
                    push(@bad, @bad_tmp);
                }
            }
        }
    }
    #####################
    # CPUSET PART, END  #
    #####################
    #if (($#bad < 0) and (!defined($cpuset_nodes))){
    if ($#bad < 0){
        # CHECK nodes
        oar_debug("[bipbip $Job_id] Check nodes : @hosts\n");
        $event_type = "PING_CHECKER_NODE_SUSPECTED";
        @bad = OAR::PingChecker::test_hosts(@hosts);
    }
    if ($#bad >= 0){
        OAR::IO::lock_table($base,["jobs","job_state_logs","resources","assigned_resources","frag_jobs","challenges","moldable_job_descriptions","job_types","job_dependencies","job_resource_groups","job_resource_descriptions","event_logs","event_log_hostnames"]);
        OAR::IO::set_job_message($base,$Job_id,"One or several nodes are not responding correctly");
        oar_error("[bipbip $Job_id] /!\\ Some nodes are inaccessible ($event_type):\n@bad\n");
        my $exit_bipbip = 1;
        if (($job->{job_type} eq "INTERACTIVE") and ($job->{reservation} eq "None")){
            my ($addr,$port) = split(/:/,$job->{info_type});
            OAR::Tools::notify_tcp_socket($addr,$port,"ERROR: some resources did not respond");
        }else{
            # Verify if the job is a reservation
            if ($job->{reservation} ne "None"){
                # Look at if there is at least one alive node for the reservation
                my @tmp_hosts;
                # Keep only alive nodes
                foreach my $n (@hosts){
                    my $i = 0;
                    while (($i <= $#bad) and ($n ne $bad[$i])){
                        $i++;
                    }
                    push(@tmp_hosts, $n) if ($i > $#bad);
                }
                if ($#tmp_hosts < 0){
                    OAR::IO::add_new_event($base,"RESERVATION_NO_NODE",$job->{job_id},"There is no alive node for the reservation $job->{job_id}.");
                }else{
                    @hosts = @tmp_hosts;
                    $exit_bipbip = 0;
                }
            }
        }
        #OAR::IO::set_job_state($base,$Job_id,"Error");
        OAR::IO::add_new_event_with_host($base,"$event_type",$Job_id,"[bipbip] OAR suspects nodes for the job $Job_id : @bad",\@bad);
        OAR::IO::unlock_table($base);
        OAR::Tools::notify_tcp_socket($Remote_host,$Remote_port,"ChState");
        if ($exit_bipbip == 1){
            OAR::IO::disconnect($base);
            exit(2);
        }
    }else{
        oar_debug("[bipbip $Job_id] No (enough) bad node\n");
    }
    # end CHECK
}

call_server_prologue();

#CALL OAREXEC ON THE FIRST NODE
my $pro_epi_timeout = OAR::Tools::get_default_prologue_epilogue_timeout();
if (is_conf("PROLOGUE_EPILOGUE_TIMEOUT")){
   $pro_epi_timeout = get_conf("PROLOGUE_EPILOGUE_TIMEOUT"); 
}
my $prologue_exec_file;
if (is_conf("PROLOGUE_EXEC_FILE")){
   $prologue_exec_file = get_conf("PROLOGUE_EXEC_FILE"); 
}
my $epilogue_exec_file;
if (is_conf("EPILOGUE_EXEC_FILE")){
   $epilogue_exec_file = get_conf("EPILOGUE_EXEC_FILE"); 
}

my @oarexec_files = ($OAR_Tools_module_filepath,"$ENV{OARDIR}/oarexec");

my $host_to_connect_via_ssh = $hosts[0];
#deploy, cosystem and no host part
if ((defined($types->{cosystem})) or ($#hosts < 0)){
    $host_to_connect_via_ssh = $Cosystem_hostname;
}elsif (defined($types->{deploy})){
    $host_to_connect_via_ssh = $Deploy_hostname;
}

oar_debug("[bipbip $Job_id] execute oarexec on node $host_to_connect_via_ssh\n");

my $oarexec_cpuset_path;
if ((defined($Cpuset_full_path) and (!defined($types->{cosystem})) and (!defined($types->{deploy})) and ($#hosts >= 0))){
    # So oarexec will retry several times to contact Almighty until it will be
    # killed by the cpuset manager
    $oarexec_cpuset_path = $Cpuset_full_path;
}

my %data_to_transfer = (
        job_id              => $Job_id,
        array_id            => $job->{array_id},
        array_index         => $job->{array_index},
        stdout_file         => OAR::Tools::replace_jobid_tag_in_string($job->{stdout_file},$Job_id),
        stderr_file         => OAR::Tools::replace_jobid_tag_in_string($job->{stderr_file},$Job_id),
        launching_directory => $job->{launching_directory},
        job_env             => $job->{job_env},
        resources           => \@resources,
        node_file_db_fields => $Node_file_db_field,
        node_file_db_fields_distinct_values => $Node_file_db_field_distinct_values,
        user                => $job->{job_user},
        job_user            => $Job_user,
        types               => $types,
        name                => $job->{job_name},
        project             => $job->{project},
        reservation         => $job->{reservation},
        walltime_seconds    => $moldable->{moldable_walltime},
        walltime            => OAR::IO::duration_to_sql($moldable->{moldable_walltime}),
        command             => $job->{command},
        challenge           => $job_challenge,
        almighty_hostname   => $Almighty_hostname,
        almighty_port       => $Remote_port,
        checkpoint_signal   => $job->{checkpoint_signal},
        debug_mode          => $Debug_mode,
        mode                => $job->{job_type},
        pro_epi_timeout	    => $pro_epi_timeout,
        prologue            => $prologue_exec_file,
        epilogue            => $epilogue_exec_file,
        tmp_directory       => OAR::Tools::get_default_oarexec_directory,
        detach_oarexec      => $Detach_oarexec,
        cpuset_full_path    => $oarexec_cpuset_path
    );

my $error = 50;
my $exit_script_value = 'N';
my $init_done = 0;
eval {
    my $pid;
    $SIG{ALRM} = sub { kill(9,$pid); die "alarm\n" };
    alarm($pro_epi_timeout + OAR::Tools::get_bipbip_ssh_hashtable_send_timeout() + OAR::Tools::get_ssh_timeout());
    my $ssh_option = "";
    if ((defined($Cpuset_full_path) and (!defined($types->{cosystem})) and (!defined($types->{deploy})) and ($#hosts >= 0))){
        # for oarsh_shell connection
        $ENV{OAR_CPUSET} = $Cpuset_full_path;
        $ssh_option = "-oSendEnv=OAR_CPUSET";
    }else{
        $ENV{OAR_CPUSET} = "";
    }
    $pid = open2(\*READ, \*WRITE, "$Openssh_cmd $ssh_option -x -T $host_to_connect_via_ssh perl - $Job_id OAREXEC");
    foreach my $f (@oarexec_files){
        open(FILE, $f) or die("Cannot open $f\n");
        while(<FILE>){
            print(WRITE);
        }
        close(FILE);
    }
    # End of oarexec script transfer
    print(WRITE "\n__END__\n");

    # Send data structure for oarexec
    print(WRITE Dumper(\%data_to_transfer)."\n");

    close(WRITE);

    while(<READ>){
        oar_debug("[bipbip $Job_id] $_");
        if (($init_done == 0) and ($_ eq OAR::Tools::get_bipbip_oarexec_rendez_vous())){
            $init_done = 1;
            if ($Detach_oarexec == 0){
                alarm(0);
            }
            OAR::IO::set_job_state($base,$Job_id,"Running");
            # Notify interactive oarsub
            if (($job->{job_type} eq "INTERACTIVE") and ($job->{reservation} eq "None")){
                oar_debug("[bipbip $Job_id] Interactive request ;Answer to the client Qsub -I\n");
                my ($addr,$port) = split(/:/,$job->{info_type});
                if (defined(OAR::Tools::notify_tcp_socket($addr,$port,"GOOD JOB"))){
                    oar_error("[bipbip $Job_id] Frag job $Job_id because oarsub cannot be notified by the server on host: $addr:$port. Check your network and firewall configuration\n");
                    OAR::IO::lock_table($base,["frag_jobs","event_logs","jobs"]);
                    OAR::IO::frag_job($base,$Job_id);
                    OAR::IO::unlock_table($base);
                    OAR::Tools::notify_tcp_socket($Remote_host,$Remote_port,"Qdel");
                }
            }
            OAR::IO::disconnect($base) if ($Detach_oarexec == 0);
        }

        # Get user script exit code
        if ($_ =~ /^OAREXEC_SCRIPT_EXIT_VALUE\s(\d+|N)$/){
            $exit_script_value = $1;
        }
    }
    close(READ);

    waitpid($pid, 0);
    $error = $? >> 8;
    alarm(0);
};
if (($Detach_oarexec == 1) and ($error == 0)){
    oar_debug("[bipbip $Job_id] Exit from bipbip normally\n");
}else{
    if ($init_done == 0){
        if (($job->{job_type} eq "INTERACTIVE") and ($job->{reservation} eq "None")){
            my ($addr,$port) = split(/:/,$job->{info_type});
            OAR::Tools::notify_tcp_socket($addr,$port,"ERROR: an error occured on the first job node");
        }
    }
    $base = OAR::IO::connect() if ($Detach_oarexec == 0);
    OAR::IO::check_end_of_job($base,$Job_id,$exit_script_value, $error,\@hosts,$Remote_host,$Remote_port,$job->{job_user},$job->{launching_directory},$Server_epilogue);
}

OAR::IO::disconnect($base);

exit(0);

