package org.elasticsearch.xpack.ml.job.process.logging;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

/* loaded from: input_file:org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.class */
public class CppLogMessageHandler implements Closeable {
    private static final Logger LOGGER = Loggers.getLogger((Class<?>) CppLogMessageHandler.class);
    private static final int DEFAULT_READBUF_SIZE = 1024;
    private static final int DEFAULT_ERROR_STORE_SIZE = 5;
    private static final long MAX_MESSAGE_INTERVAL_SECONDS = 10;
    private final String jobId;
    private final InputStream inputStream;
    private final int readBufSize;
    private final int errorStoreSize;
    private final Deque<String> errorStore;
    private final CountDownLatch pidLatch;
    private final CountDownLatch cppCopyrightLatch;
    private final CountDownLatch logStreamClosedLatch;
    private MessageSummary lastMessageSummary;
    private volatile boolean seenFatalError;
    private volatile long pid;
    private volatile String cppCopyright;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler$MessageSummary.class */
    public static class MessageSummary {
        Instant timestamp = Instant.EPOCH;
        CppLogMessage message = null;
        int count = 0;
        Level level = Level.OFF;

        MessageSummary() {
        }

        void reset(Instant instant, CppLogMessage cppLogMessage, Level level) {
            this.timestamp = instant;
            this.message = cppLogMessage;
            this.count = 0;
            this.level = level;
        }
    }

    public CppLogMessageHandler(String str, InputStream inputStream) {
        this(inputStream, str, 1024, 5);
    }

    CppLogMessageHandler(InputStream inputStream, String str, int i, int i2) {
        this.lastMessageSummary = new MessageSummary();
        this.jobId = str;
        this.inputStream = (InputStream) Objects.requireNonNull(inputStream);
        this.readBufSize = i;
        this.errorStoreSize = i2;
        this.errorStore = ConcurrentCollections.newDeque();
        this.pidLatch = new CountDownLatch(1);
        this.cppCopyrightLatch = new CountDownLatch(1);
        this.logStreamClosedLatch = new CountDownLatch(1);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.inputStream.close();
    }

    public void tailStream() throws IOException {
        try {
            XContent xContent = XContentFactory.xContent(XContentType.JSON);
            BytesReference bytesReference = null;
            byte[] bArr = new byte[this.readBufSize];
            int read = this.inputStream.read(bArr);
            while (read != -1) {
                bytesReference = parseMessages(xContent, bytesReference == null ? new BytesArray(bArr, 0, read) : new CompositeBytesReference(bytesReference, new BytesArray(bArr, 0, read)));
                bArr = new byte[this.readBufSize];
                read = this.inputStream.read(bArr);
            }
        } finally {
            this.logStreamClosedLatch.countDown();
            if (this.lastMessageSummary.count > 0) {
                logSummarizedMessage();
            }
        }
    }

    public boolean hasLogStreamEnded() {
        return this.logStreamClosedLatch.getCount() == 0;
    }

    public boolean seenFatalError() {
        return this.seenFatalError;
    }

    public boolean waitForLogStreamClose(Duration duration) {
        try {
            return this.logStreamClosedLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public long getPid(Duration duration) throws TimeoutException {
        if (this.pid == 0) {
            try {
                this.pidLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.pid == 0) {
                throw new TimeoutException("Timed out waiting for C++ process PID");
            }
        }
        return this.pid;
    }

    public String getCppCopyright(Duration duration) throws TimeoutException {
        if (this.cppCopyright == null) {
            try {
                this.cppCopyrightLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.cppCopyright == null) {
                throw new TimeoutException("Timed out waiting for C++ process copyright");
            }
        }
        return this.cppCopyright;
    }

    public String getErrors() {
        String[] strArr = (String[]) this.errorStore.toArray(new String[0]);
        StringBuilder sb = new StringBuilder();
        for (String str : strArr) {
            sb.append(str).append('\n');
        }
        return sb.toString();
    }

    private BytesReference parseMessages(XContent xContent, BytesReference bytesReference) {
        int i;
        byte streamSeparator = xContent.streamSeparator();
        int i2 = 0;
        while (true) {
            i = i2;
            int findNextMarker = findNextMarker(streamSeparator, bytesReference, i);
            if (findNextMarker == -1) {
                break;
            }
            if (findNextMarker > i) {
                parseMessage(xContent, bytesReference.slice(i, findNextMarker - i));
            }
            i2 = findNextMarker + 1;
        }
        if (i >= bytesReference.length()) {
            return null;
        }
        return bytesReference.slice(i, bytesReference.length() - i);
    }

    private void parseMessage(XContent xContent, BytesReference bytesReference) {
        try {
            CppLogMessage apply2 = CppLogMessage.PARSER.apply2(xContent.createParser(NamedXContentRegistry.EMPTY, bytesReference), (XContentParser) null);
            Level level = Level.getLevel(apply2.getLevel());
            if (level == null) {
                level = Level.WARN;
            } else if (level.isMoreSpecificThan(Level.ERROR)) {
                storeError(apply2.getMessage());
                if (level.isMoreSpecificThan(Level.FATAL)) {
                    this.seenFatalError = true;
                }
            }
            long pid = apply2.getPid();
            if (this.pid != pid) {
                this.pid = pid;
                this.pidLatch.countDown();
            }
            String message = apply2.getMessage();
            if (this.cppCopyright == null && message.contains("Copyright")) {
                this.cppCopyright = message;
                this.cppCopyrightLatch.countDown();
            }
            if (LOGGER.isEnabled(level)) {
                if (!LOGGER.isDebugEnabled()) {
                    if (apply2.isSimilarTo(this.lastMessageSummary.message) && this.lastMessageSummary.timestamp.until(apply2.getTimestamp(), ChronoUnit.SECONDS) < MAX_MESSAGE_INTERVAL_SECONDS) {
                        this.lastMessageSummary.count++;
                        this.lastMessageSummary.message = apply2;
                        return;
                    } else {
                        if (this.lastMessageSummary.count > 0) {
                            logSummarizedMessage();
                        }
                        this.lastMessageSummary.reset(apply2.getTimestamp(), apply2, level);
                    }
                }
                if (this.jobId != null) {
                    LOGGER.log(level, "[{}] [{}/{}] [{}@{}] {}", this.jobId, apply2.getLogger(), Long.valueOf(pid), apply2.getFile(), Long.valueOf(apply2.getLine()), message);
                } else {
                    LOGGER.log(level, "[{}/{}] [{}@{}] {}", apply2.getLogger(), Long.valueOf(pid), apply2.getFile(), Long.valueOf(apply2.getLine()), message);
                }
            }
        } catch (IOException e) {
            if (this.jobId != null) {
                LOGGER.warn((Message) new ParameterizedMessage("[{}] Failed to parse C++ log message: {}", this.jobId, bytesReference.utf8ToString()), (Throwable) e);
            } else {
                LOGGER.warn((Message) new ParameterizedMessage("Failed to parse C++ log message: {}", bytesReference.utf8ToString()), (Throwable) e);
            }
        }
    }

    private void logSummarizedMessage() {
        if (this.lastMessageSummary.count > 1) {
            if (this.jobId != null) {
                LOGGER.log(this.lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {} | repeated [{}]", this.jobId, this.lastMessageSummary.message.getLogger(), Long.valueOf(this.lastMessageSummary.message.getPid()), this.lastMessageSummary.message.getFile(), Long.valueOf(this.lastMessageSummary.message.getLine()), this.lastMessageSummary.message.getMessage(), Integer.valueOf(this.lastMessageSummary.count));
                return;
            } else {
                LOGGER.log(this.lastMessageSummary.level, "[{}/{}] [{}@{}] {} | repeated [{}]", this.lastMessageSummary.message.getLogger(), Long.valueOf(this.lastMessageSummary.message.getPid()), this.lastMessageSummary.message.getFile(), Long.valueOf(this.lastMessageSummary.message.getLine()), this.lastMessageSummary.message.getMessage(), Integer.valueOf(this.lastMessageSummary.count));
                return;
            }
        }
        if (this.jobId != null) {
            LOGGER.log(this.lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {}", this.jobId, this.lastMessageSummary.message.getLogger(), Long.valueOf(this.lastMessageSummary.message.getPid()), this.lastMessageSummary.message.getFile(), Long.valueOf(this.lastMessageSummary.message.getLine()), this.lastMessageSummary.message.getMessage());
        } else {
            LOGGER.log(this.lastMessageSummary.level, "[{}/{}] [{}@{}] {}", this.lastMessageSummary.message.getLogger(), Long.valueOf(this.lastMessageSummary.message.getPid()), this.lastMessageSummary.message.getFile(), Long.valueOf(this.lastMessageSummary.message.getLine()), this.lastMessageSummary.message.getMessage());
        }
    }

    private void storeError(String str) {
        if (Strings.isNullOrEmpty(str) || this.errorStoreSize <= 0) {
            return;
        }
        if (this.errorStore.size() >= this.errorStoreSize) {
            this.errorStore.removeFirst();
        }
        this.errorStore.offerLast(str);
    }

    private static int findNextMarker(byte b, BytesReference bytesReference, int i) {
        for (int i2 = i; i2 < bytesReference.length(); i2++) {
            if (bytesReference.get(i2) == b) {
                return i2;
            }
        }
        return -1;
    }
}
