package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;

/* loaded from: input_file:org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.class */
class NativeAutodetectProcess implements AutodetectProcess {
    private static final Logger LOGGER = Loggers.getLogger((Class<?>) NativeAutodetectProcess.class);
    private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000);
    private final String jobId;
    private final CppLogMessageHandler cppLogHandler;
    private final OutputStream processInStream;
    private final InputStream processOutStream;
    private final OutputStream processRestoreStream;
    private final LengthEncodedWriter recordWriter;
    private final ZonedDateTime startTime = ZonedDateTime.now();
    private final int numberOfAnalysisFields;
    private final List<Path> filesToDelete;
    private final Runnable onProcessCrash;
    private volatile Future<?> logTailFuture;
    private volatile Future<?> stateProcessorFuture;
    private volatile boolean processCloseInitiated;
    private volatile boolean processKilled;
    private volatile boolean isReady;
    private final AutodetectResultsParser resultsParser;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NativeAutodetectProcess(String str, InputStream inputStream, OutputStream outputStream, InputStream inputStream2, OutputStream outputStream2, int i, List<Path> list, AutodetectResultsParser autodetectResultsParser, Runnable runnable) {
        this.jobId = str;
        this.cppLogHandler = new CppLogMessageHandler(str, inputStream);
        this.processInStream = new BufferedOutputStream(outputStream);
        this.processOutStream = inputStream2;
        this.processRestoreStream = outputStream2;
        this.recordWriter = new LengthEncodedWriter(this.processInStream);
        this.numberOfAnalysisFields = i;
        this.filesToDelete = list;
        this.resultsParser = autodetectResultsParser;
        this.onProcessCrash = (Runnable) Objects.requireNonNull(runnable);
    }

    public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream inputStream) {
        this.logTailFuture = executorService.submit(() -> {
            try {
                try {
                    CppLogMessageHandler cppLogMessageHandler = this.cppLogHandler;
                    Throwable th = null;
                    try {
                        try {
                            cppLogMessageHandler.tailStream();
                            if (cppLogMessageHandler != null) {
                                if (0 != 0) {
                                    try {
                                        cppLogMessageHandler.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    cppLogMessageHandler.close();
                                }
                            }
                            if (this.processCloseInitiated || this.processKilled) {
                                return;
                            }
                            LOGGER.error("[{}] autodetect process stopped unexpectedly", this.jobId);
                            this.onProcessCrash.run();
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (cppLogMessageHandler != null) {
                            if (th != null) {
                                try {
                                    cppLogMessageHandler.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                cppLogMessageHandler.close();
                            }
                        }
                        throw th4;
                    }
                } catch (Throwable th6) {
                    if (!this.processCloseInitiated && !this.processKilled) {
                        LOGGER.error("[{}] autodetect process stopped unexpectedly", this.jobId);
                        this.onProcessCrash.run();
                    }
                    throw th6;
                }
            } catch (IOException e) {
                if (!this.processKilled) {
                    LOGGER.error((Message) new ParameterizedMessage("[{}] Error tailing autodetect process logs", this.jobId), (Throwable) e);
                }
                if (this.processCloseInitiated || this.processKilled) {
                    return;
                }
                LOGGER.error("[{}] autodetect process stopped unexpectedly", this.jobId);
                this.onProcessCrash.run();
            }
        });
        this.stateProcessorFuture = executorService.submit(() -> {
            Throwable th = null;
            try {
                try {
                    try {
                        stateProcessor.process(this.jobId, inputStream);
                        if (!this.processKilled) {
                            LOGGER.info("[{}] State output finished", this.jobId);
                        }
                        if (inputStream != null) {
                            if (0 != 0) {
                                try {
                                    inputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                inputStream.close();
                            }
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    if (this.processKilled) {
                        return;
                    }
                    LOGGER.error((Message) new ParameterizedMessage("[{}] Error reading autodetect state output", this.jobId), (Throwable) e);
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        });
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public void restoreState(StateStreamer stateStreamer, ModelSnapshot modelSnapshot) {
        if (modelSnapshot != null) {
            try {
                OutputStream outputStream = this.processRestoreStream;
                Throwable th = null;
                try {
                    stateStreamer.restoreStateToStream(this.jobId, modelSnapshot, outputStream);
                    if (outputStream != null) {
                        if (0 != 0) {
                            try {
                                outputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            outputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                if (!this.processKilled) {
                    LOGGER.error("Error restoring model state for job " + this.jobId, (Throwable) e);
                }
            }
        }
        this.isReady = true;
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public boolean isReady() {
        return this.isReady;
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public void writeRecord(String[] strArr) throws IOException {
        this.recordWriter.writeRecord(strArr);
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public void writeResetBucketsControlMessage(DataLoadParams dataLoadParams) throws IOException {
        new ControlMsgToProcessWriter(this.recordWriter, this.numberOfAnalysisFields).writeResetBucketsMessage(dataLoadParams);
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public void writeUpdateModelPlotMessage(ModelPlotConfig modelPlotConfig) throws IOException {
        new ControlMsgToProcessWriter(this.recordWriter, this.numberOfAnalysisFields).writeUpdateModelPlotMessage(modelPlotConfig);
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public void writeUpdateDetectorRulesMessage(int i, List<DetectionRule> list) throws IOException {
        new ControlMsgToProcessWriter(this.recordWriter, this.numberOfAnalysisFields).writeUpdateDetectorRulesMessage(i, list);
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public String flushJob(FlushJobParams flushJobParams) throws IOException {
        ControlMsgToProcessWriter controlMsgToProcessWriter = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfAnalysisFields);
        controlMsgToProcessWriter.writeFlushControlMessage(flushJobParams);
        return controlMsgToProcessWriter.writeFlushMessage();
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public void flushStream() throws IOException {
        this.recordWriter.flush();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.processCloseInitiated = true;
            this.processInStream.close();
            if (this.stateProcessorFuture != null) {
                this.stateProcessorFuture.get(MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES);
            }
            if (this.logTailFuture != null) {
                this.logTailFuture.get(5L, TimeUnit.SECONDS);
            }
            if (this.cppLogHandler.seenFatalError()) {
                throw ExceptionsHelper.serverError(this.cppLogHandler.getErrors());
            }
            LOGGER.debug("[{}] Autodetect process exited", this.jobId);
        } catch (InterruptedException e) {
            LOGGER.warn((Message) new ParameterizedMessage("[{}] Exception closing the running autodetect process", this.jobId), (Throwable) e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            LOGGER.warn((Message) new ParameterizedMessage("[{}] Exception closing the running autodetect process", this.jobId), e2);
        } finally {
            deleteAssociatedFiles();
        }
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public void kill() throws IOException {
        this.processKilled = true;
        try {
            try {
                NativeControllerHolder.getNativeController().killProcess(this.cppLogHandler.getPid(Duration.ZERO));
                this.cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
            } catch (TimeoutException e) {
                LOGGER.warn("[{}] Failed to get PID of autodetect process to kill", this.jobId);
                try {
                    this.processInStream.close();
                } catch (IOException e2) {
                }
                try {
                    deleteAssociatedFiles();
                } catch (IOException e3) {
                }
            }
        } finally {
            try {
                this.processInStream.close();
            } catch (IOException e4) {
            }
            try {
                deleteAssociatedFiles();
            } catch (IOException e5) {
            }
        }
    }

    private synchronized void deleteAssociatedFiles() throws IOException {
        if (this.filesToDelete == null) {
            return;
        }
        for (Path path : this.filesToDelete) {
            if (Files.deleteIfExists(path)) {
                LOGGER.debug("[{}] Deleted file {}", this.jobId, path.toString());
            } else {
                LOGGER.warn("[{}] Failed to delete file {}", this.jobId, path.toString());
            }
        }
        this.filesToDelete.clear();
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public Iterator<AutodetectResult> readAutodetectResults() {
        return this.resultsParser.parseResults(this.processOutStream);
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public ZonedDateTime getProcessStartTime() {
        return this.startTime;
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public boolean isProcessAlive() {
        return !this.cppLogHandler.hasLogStreamEnded();
    }

    @Override // org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess
    public String readError() {
        return this.cppLogHandler.getErrors();
    }
}
