package org.voltdb.rejoin;

import com.google_voltpatches.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.Mailbox;
import org.voltcore.messaging.VoltMessage;
import org.voltdb.exceptions.SerializableException;
import org.voltdb.rejoin.StreamSnapshotBase;

/* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotAckReceiver.class */
public class StreamSnapshotAckReceiver implements Runnable {
    private static final VoltLogger rejoinLog = new VoltLogger("REJOIN");
    private final Mailbox m_mb;
    private final StreamSnapshotBase.MessageFactory m_msgFactory;
    private final Map<Long, AckCallback> m_callbacks;
    private final AtomicInteger m_expectedEOFs;
    private volatile boolean stopped;
    private volatile Thread m_thread;

    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotAckReceiver$AckCallback.class */
    public interface AckCallback {
        void receiveAck(int i);

        void receiveError(Exception exc);
    }

    public StreamSnapshotAckReceiver(Mailbox mailbox) {
        this(mailbox, new StreamSnapshotBase.DefaultMessageFactory());
    }

    public StreamSnapshotAckReceiver(Mailbox mailbox, StreamSnapshotBase.MessageFactory messageFactory) {
        this.stopped = false;
        Preconditions.checkArgument(mailbox != null);
        this.m_mb = mailbox;
        this.m_msgFactory = messageFactory;
        this.m_callbacks = Collections.synchronizedMap(new HashMap());
        this.m_expectedEOFs = new AtomicInteger();
    }

    public void setCallback(long j, AckCallback ackCallback, int i) {
        this.m_expectedEOFs.addAndGet(i);
        this.m_callbacks.put(Long.valueOf(j), ackCallback);
    }

    public void forceStop() {
        this.stopped = true;
        Thread thread = this.m_thread;
        if (thread != null) {
            thread.interrupt();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.m_thread = Thread.currentThread();
        rejoinLog.trace("Starting ack receiver thread");
        while (!this.stopped) {
            try {
                try {
                    rejoinLog.trace("Blocking on receiving mailbox");
                    VoltMessage recvBlocking = this.m_mb.recvBlocking(600000L);
                    if (this.stopped) {
                        rejoinLog.debug("Ack receiver thread stopped");
                        this.m_thread = null;
                        rejoinLog.trace("Ack receiver thread exiting");
                        return;
                    }
                    if (recvBlocking == null) {
                        rejoinLog.warn("No stream snapshot ack message was received in the past 10 minutes or the thread was interrupted (expected eofs: " + this.m_expectedEOFs.get() + ")");
                    } else if (!StreamSnapshotDataTarget.m_rejoinDeathTestMode || this.m_msgFactory.getAckTargetId(recvBlocking) != 1) {
                        SerializableException exception = this.m_msgFactory.getException(recvBlocking);
                        if (exception != null) {
                            handleException("Received exception in ack receiver", exception);
                            this.m_thread = null;
                            rejoinLog.trace("Ack receiver thread exiting");
                            return;
                        }
                        AckCallback ackCallback = this.m_callbacks.get(Long.valueOf(this.m_msgFactory.getAckTargetId(recvBlocking)));
                        if (ackCallback == null) {
                            rejoinLog.warn("Unknown target ID " + this.m_msgFactory.getAckTargetId(recvBlocking) + " in stream snapshot ack message");
                        } else {
                            int ackBlockIndex = this.m_msgFactory.getAckBlockIndex(recvBlocking);
                            if (ackBlockIndex != -1) {
                                ackCallback.receiveAck(ackBlockIndex);
                            }
                            if (this.m_msgFactory.isAckEOS(recvBlocking) && this.m_expectedEOFs.decrementAndGet() == 0) {
                                this.m_thread = null;
                                rejoinLog.trace("Ack receiver thread exiting");
                                return;
                            }
                        }
                    }
                } catch (Exception e) {
                    handleException("Error reading a message from a recovery stream", e);
                    this.m_thread = null;
                    rejoinLog.trace("Ack receiver thread exiting");
                    return;
                }
            } catch (Throwable th) {
                this.m_thread = null;
                rejoinLog.trace("Ack receiver thread exiting");
                throw th;
            }
        }
        rejoinLog.debug("Ack receiver thread stopped");
        this.m_thread = null;
        rejoinLog.trace("Ack receiver thread exiting");
    }

    private void handleException(String str, Exception exc) {
        rejoinLog.error(str, exc);
        this.m_callbacks.values().forEach(ackCallback -> {
            ackCallback.receiveError(exc);
        });
    }

    public boolean isStopped() {
        return this.stopped;
    }
}
