package org.voltdb.rejoin;

import com.google_voltpatches.common.base.Preconditions;
import com.google_voltpatches.common.base.Throwables;
import com.google_voltpatches.common.primitives.Longs;
import com.google_voltpatches.common.util.concurrent.Futures;
import com.google_voltpatches.common.util.concurrent.ListenableFuture;
import com.google_voltpatches.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.Mailbox;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.DBBPool;
import org.voltcore.utils.EstTimeUpdater;
import org.voltcore.utils.Pair;
import org.voltdb.SnapshotDataTarget;
import org.voltdb.SnapshotFormat;
import org.voltdb.SnapshotTableInfo;
import org.voltdb.VoltDB;
import org.voltdb.rejoin.StreamSnapshotAckReceiver;
import org.voltdb.rejoin.StreamSnapshotBase;
import org.voltdb.utils.CompressionService;

/* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotDataTarget.class */
public class StreamSnapshotDataTarget extends StreamSnapshotBase implements SnapshotDataTarget, StreamSnapshotAckReceiver.AckCallback {
    private static final VoltLogger rejoinLog;
    static boolean m_rejoinDeathTestMode;
    private static AtomicLong m_totalSnapshotTargetCount;
    final long m_targetId;
    public static final long DEFAULT_WRITE_TIMEOUT_MS;
    static final long WATCHDOG_PERIOD_S = 5;
    static final int ROW_COUNT_OFFSET = 13;
    static final int DATA_HEADER_BYTES = 17;
    private final Map<Integer, Pair<Boolean, byte[]>> m_schemas;
    private final long m_srcHSId;
    private final long m_destHSId;
    private final Set<Long> m_otherDestHostHSIds;
    private final boolean m_replicatedTableTarget;
    private final SnapshotSender m_sender;
    private final StreamSnapshotAckReceiver m_ackReceiver;
    private final AtomicReference<Exception> m_writeFailed;
    private boolean m_failureReported;
    private volatile IOException m_reportedSerializationFailure;
    final AtomicInteger m_outstandingWorkCount;
    private final TreeMap<Integer, SendWork> m_outstandingWork;
    int m_blockIndex;
    private final AtomicReference<Runnable> m_onCloseHandler;
    private Runnable m_progressHandler;
    private final AtomicBoolean m_closed;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotDataTarget$SendWork.class */
    public static class SendWork {
        DBBPool.BBContainer m_message;
        final StreamSnapshotMessageType m_type;
        final long m_targetId;
        final long m_destHSId;
        final Set<Long> m_otherDestHSIds;
        AtomicInteger m_ackCounter;
        final long m_ts;
        final boolean m_isEmpty;
        final SettableFuture<Boolean> m_future;

        SendWork() {
            this.m_type = StreamSnapshotMessageType.DATA;
            this.m_isEmpty = true;
            this.m_targetId = -1L;
            this.m_destHSId = -1L;
            this.m_otherDestHSIds = null;
            this.m_ts = -1L;
            this.m_future = null;
        }

        SendWork(StreamSnapshotMessageType streamSnapshotMessageType, long j, long j2, Set<Long> set, DBBPool.BBContainer bBContainer, SettableFuture<Boolean> settableFuture) {
            this.m_isEmpty = false;
            this.m_type = streamSnapshotMessageType;
            this.m_targetId = j;
            this.m_destHSId = j2;
            this.m_otherDestHSIds = set;
            this.m_message = bBContainer;
            this.m_ts = System.currentTimeMillis();
            this.m_future = settableFuture;
        }

        public synchronized void discard() {
            if (this.m_message != null) {
                this.m_message.discard();
                this.m_message = null;
            }
        }

        protected int send(Mailbox mailbox, StreamSnapshotBase.MessageFactory messageFactory, DBBPool.BBContainer bBContainer) throws IOException {
            ByteBuffer b = bBContainer.b();
            if (b.isDirect()) {
                byte[] compressBuffer = CompressionService.compressBuffer(b);
                mailbox.send(this.m_destHSId, messageFactory.makeDataMessage(this.m_targetId, compressBuffer));
                return compressBuffer.length;
            }
            byte[] compressBytes = CompressionService.compressBytes(b.array(), b.position(), b.remaining());
            mailbox.send(this.m_destHSId, messageFactory.makeDataMessage(this.m_targetId, compressBytes));
            return compressBytes.length;
        }

        private void sendReplicatedDataToNonLowestSites(Mailbox mailbox, StreamSnapshotBase.MessageFactory messageFactory, ByteBuffer byteBuffer, int i) throws IOException {
            mailbox.send(Longs.toArray(this.m_otherDestHSIds), messageFactory.makeDataMessage(this.m_targetId, byteBuffer.isDirect() ? CompressionService.compressBuffer(byteBuffer) : CompressionService.compressBytes(byteBuffer.array(), 0, i)));
        }

        public synchronized int doWork(Mailbox mailbox, StreamSnapshotBase.MessageFactory messageFactory) throws Exception {
            int send;
            if (this.m_message == null) {
                this.m_ackCounter = new AtomicInteger(1);
                return 0;
            }
            try {
                if (this.m_otherDestHSIds != null) {
                    this.m_ackCounter = new AtomicInteger(this.m_otherDestHSIds.size() + 1);
                    send = send(mailbox, messageFactory, this.m_message);
                    if (this.m_type == StreamSnapshotMessageType.DATA) {
                        ByteBuffer allocate = ByteBuffer.allocate(17);
                        this.m_message.b().get(allocate.array(), 0, 13);
                        this.m_message.b().position(0);
                        allocate.position(13);
                        allocate.putInt(0);
                        allocate.position(0);
                        sendReplicatedDataToNonLowestSites(mailbox, messageFactory, allocate, 17);
                    } else if (this.m_type == StreamSnapshotMessageType.END) {
                        sendReplicatedDataToNonLowestSites(mailbox, messageFactory, this.m_message.b(), this.m_message.b().limit());
                    } else {
                        sendReplicatedDataToNonLowestSites(mailbox, messageFactory, this.m_message.b(), this.m_message.b().remaining());
                    }
                } else {
                    this.m_ackCounter = new AtomicInteger(1);
                    send = send(mailbox, messageFactory, this.m_message);
                }
                if (StreamSnapshotDataTarget.rejoinLog.isTraceEnabled()) {
                    StreamSnapshotDataTarget.rejoinLog.trace("Sent " + this.m_type.name() + " from " + this.m_targetId + " expected ackCounter " + this.m_ackCounter + " otherDestHSIds " + this.m_otherDestHSIds);
                }
                return send;
            } finally {
                this.m_future.set(Boolean.valueOf(true));
            }
        }

        public boolean receiveAck() {
            return this.m_ackCounter.decrementAndGet() == 0;
        }
    }

    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotDataTarget$SnapshotSender.class */
    public static class SnapshotSender implements Runnable {
        private final Mailbox m_mb;
        private final StreamSnapshotBase.MessageFactory m_msgFactory;
        private final LinkedBlockingQueue<SendWork> m_workQueue;
        private final AtomicInteger m_expectedEOFs;
        final Map<Long, AtomicLong> m_bytesSent;
        final Map<Long, AtomicLong> m_worksSent;
        volatile Exception m_lastException;

        public SnapshotSender(Mailbox mailbox) {
            this(mailbox, new StreamSnapshotBase.DefaultMessageFactory());
        }

        public SnapshotSender(Mailbox mailbox, StreamSnapshotBase.MessageFactory messageFactory) {
            this.m_lastException = null;
            Preconditions.checkArgument(mailbox != null);
            this.m_mb = mailbox;
            this.m_msgFactory = messageFactory;
            this.m_workQueue = new LinkedBlockingQueue<>();
            this.m_expectedEOFs = new AtomicInteger();
            this.m_bytesSent = Collections.synchronizedMap(new HashMap());
            this.m_worksSent = Collections.synchronizedMap(new HashMap());
        }

        public void registerDataTarget(long j) {
            this.m_expectedEOFs.incrementAndGet();
            this.m_bytesSent.put(Long.valueOf(j), new AtomicLong());
            this.m_worksSent.put(Long.valueOf(j), new AtomicLong());
        }

        public void offer(SendWork sendWork) {
            this.m_workQueue.offer(sendWork);
        }

        @Override // java.lang.Runnable
        public void run() {
            SendWork poll;
            StreamSnapshotDataTarget.rejoinLog.trace("Starting stream sender thread");
            while (true) {
                try {
                    StreamSnapshotDataTarget.rejoinLog.trace("Blocking on sending work queue");
                    poll = this.m_workQueue.poll(10L, TimeUnit.MINUTES);
                } catch (Exception e) {
                    this.m_lastException = e;
                    StreamSnapshotDataTarget.rejoinLog.error("Error sending a recovery stream message", e);
                }
                if (poll == null) {
                    StreamSnapshotDataTarget.rejoinLog.warn("No stream snapshot send work was produced in the past 10 minutes");
                    break;
                } else if (!poll.m_isEmpty) {
                    this.m_bytesSent.get(Long.valueOf(poll.m_targetId)).addAndGet(poll.doWork(this.m_mb, this.m_msgFactory));
                    this.m_worksSent.get(Long.valueOf(poll.m_targetId)).incrementAndGet();
                } else if (this.m_expectedEOFs.decrementAndGet() == 0) {
                    break;
                }
            }
            CompressionService.releaseThreadLocal();
            StreamSnapshotDataTarget.rejoinLog.trace("Stream sender thread exiting");
        }
    }

    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotDataTarget$SnapshotSerializationException.class */
    public static class SnapshotSerializationException extends IOException {
        private static final long serialVersionUID = 1;

        public SnapshotSerializationException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotDataTarget$StreamSnapshotTimeoutException.class */
    public static class StreamSnapshotTimeoutException extends IOException {
        private static final long serialVersionUID = 1;

        public StreamSnapshotTimeoutException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:org/voltdb/rejoin/StreamSnapshotDataTarget$Watchdog.class */
    class Watchdog implements Runnable {
        final long m_bytesWrittenSinceConstruction;
        final long m_writeTimeout;
        final long m_lastDataWrite;

        Watchdog(long j, long j2, long j3) {
            this.m_bytesWrittenSinceConstruction = j;
            this.m_writeTimeout = j2;
            this.m_lastDataWrite = j3;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (StreamSnapshotDataTarget.this.m_closed.get()) {
                return;
            }
            boolean z = true;
            try {
                try {
                    int hostIdFromHSId = CoreUtils.getHostIdFromHSId(StreamSnapshotDataTarget.this.m_destHSId);
                    long j = StreamSnapshotDataTarget.this.m_sender.m_bytesSent.get(Long.valueOf(StreamSnapshotDataTarget.this.m_targetId)).get();
                    long j2 = j - this.m_bytesWrittenSinceConstruction;
                    StreamSnapshotDataTarget.rejoinLog.info(String.format("While sending rejoin data from site %s to site %s, %d bytes have been sent in the past %s seconds.", CoreUtils.hsIdToString(StreamSnapshotDataTarget.this.m_srcHSId), CoreUtils.hsIdToString(StreamSnapshotDataTarget.this.m_destHSId), Long.valueOf(j2), 5L));
                    StreamSnapshotDataTarget.this.checkTimeout(this.m_writeTimeout);
                    if (StreamSnapshotDataTarget.this.m_writeFailed.get() != null) {
                        StreamSnapshotDataTarget.this.clearOutstanding();
                        z = false;
                    } else if (j2 == 0) {
                        z = VoltDB.instance().getHostMessenger().getLiveHostIds().contains(Integer.valueOf(hostIdFromHSId));
                        if (!z) {
                            if (StreamSnapshotDataTarget.this.m_writeFailed.get() == null) {
                                StreamSnapshotDataTarget.this.setWriteFailed(new StreamSnapshotTimeoutException("A snapshot write task failed after rejoining node is down."));
                            }
                            StreamSnapshotDataTarget.this.clearOutstanding();
                        }
                    }
                    if (z) {
                        VoltDB.instance().scheduleWork(new Watchdog(j, this.m_writeTimeout, j2 > 0 ? System.currentTimeMillis() : this.m_lastDataWrite), 5L, -1L, TimeUnit.SECONDS);
                    } else {
                        StreamSnapshotDataTarget.rejoinLog.info(String.format("Stop watching stream snapshot from site %s to site %s", CoreUtils.hsIdToString(StreamSnapshotDataTarget.this.m_srcHSId), CoreUtils.hsIdToString(StreamSnapshotDataTarget.this.m_destHSId)));
                    }
                } catch (Throwable th) {
                    StreamSnapshotDataTarget.rejoinLog.error("Stream snapshot watchdog thread threw an exception", th);
                    if (1 != 0) {
                        VoltDB.instance().scheduleWork(new Watchdog(0L, this.m_writeTimeout, 0 > 0 ? System.currentTimeMillis() : this.m_lastDataWrite), 5L, -1L, TimeUnit.SECONDS);
                    } else {
                        StreamSnapshotDataTarget.rejoinLog.info(String.format("Stop watching stream snapshot from site %s to site %s", CoreUtils.hsIdToString(StreamSnapshotDataTarget.this.m_srcHSId), CoreUtils.hsIdToString(StreamSnapshotDataTarget.this.m_destHSId)));
                    }
                }
            } catch (Throwable th2) {
                if (1 != 0) {
                    VoltDB.instance().scheduleWork(new Watchdog(0L, this.m_writeTimeout, 0 > 0 ? System.currentTimeMillis() : this.m_lastDataWrite), 5L, -1L, TimeUnit.SECONDS);
                } else {
                    StreamSnapshotDataTarget.rejoinLog.info(String.format("Stop watching stream snapshot from site %s to site %s", CoreUtils.hsIdToString(StreamSnapshotDataTarget.this.m_srcHSId), CoreUtils.hsIdToString(StreamSnapshotDataTarget.this.m_destHSId)));
                }
                throw th2;
            }
        }
    }

    public StreamSnapshotDataTarget(long j, long j2, boolean z, Set<Long> set, byte[] bArr, List<SnapshotTableInfo> list, SnapshotSender snapshotSender, StreamSnapshotAckReceiver streamSnapshotAckReceiver) {
        this(j, j2, z, set, bArr, list, DEFAULT_WRITE_TIMEOUT_MS, snapshotSender, streamSnapshotAckReceiver);
    }

    public StreamSnapshotDataTarget(long j, long j2, boolean z, Set<Long> set, byte[] bArr, List<SnapshotTableInfo> list, long j3, SnapshotSender snapshotSender, StreamSnapshotAckReceiver streamSnapshotAckReceiver) {
        this.m_writeFailed = new AtomicReference<>();
        this.m_failureReported = false;
        this.m_reportedSerializationFailure = null;
        this.m_outstandingWorkCount = new AtomicInteger(0);
        this.m_outstandingWork = new TreeMap<>();
        this.m_blockIndex = 0;
        this.m_onCloseHandler = new AtomicReference<>(null);
        this.m_progressHandler = null;
        this.m_closed = new AtomicBoolean(false);
        if (!$assertionsDisabled && VoltDB.instanceOnServerThread() && m_rejoinDeathTestMode) {
            throw new AssertionError();
        }
        this.m_targetId = m_totalSnapshotTargetCount.getAndIncrement();
        this.m_schemas = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTableId();
        }, snapshotTableInfo -> {
            return Pair.of(Boolean.valueOf(snapshotTableInfo.isReplicated()), snapshotTableInfo.getSchema());
        }));
        this.m_srcHSId = j;
        this.m_destHSId = j2;
        this.m_replicatedTableTarget = z;
        this.m_otherDestHostHSIds = new HashSet(set);
        this.m_otherDestHostHSIds.remove(Long.valueOf(this.m_destHSId));
        this.m_sender = snapshotSender;
        this.m_sender.registerDataTarget(this.m_targetId);
        this.m_ackReceiver = streamSnapshotAckReceiver;
        this.m_ackReceiver.setCallback(this.m_targetId, this, this.m_replicatedTableTarget ? set.size() : 1);
        VoltLogger voltLogger = rejoinLog;
        Object[] objArr = new Object[4];
        objArr[0] = CoreUtils.hsIdToString(this.m_srcHSId);
        objArr[1] = CoreUtils.hsIdToString(this.m_destHSId);
        objArr[2] = Long.valueOf(this.m_targetId);
        objArr[3] = z ? " [Lowest Site]" : "";
        voltLogger.debug(String.format("Initializing snapshot stream processor for src site id : %s, dest site id: %s, and with processorid: %d%s", objArr));
        VoltDB.instance().scheduleWork(new Watchdog(0L, j3, System.currentTimeMillis()), 5L, -1L, TimeUnit.SECONDS);
        if (bArr != null) {
            send(StreamSnapshotMessageType.HASHINATOR, -1, bArr, false);
        }
    }

    public boolean isReplicatedTableTarget() {
        return this.m_replicatedTableTarget;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void checkTimeout(long j) {
        Map.Entry<Integer, SendWork> firstEntry = this.m_outstandingWork.firstEntry();
        if (firstEntry != null) {
            long currentTimeMillis = System.currentTimeMillis();
            SendWork value = firstEntry.getValue();
            if (currentTimeMillis - value.m_ts > j) {
                StreamSnapshotTimeoutException streamSnapshotTimeoutException = new StreamSnapshotTimeoutException(String.format("A snapshot write task failed on site %s after a timeout (currently %d seconds outstanding). Node rejoin may need to be retried", CoreUtils.hsIdToString(this.m_srcHSId), Long.valueOf((currentTimeMillis - value.m_ts) / 1000)));
                rejoinLog.error(streamSnapshotTimeoutException.getMessage());
                setWriteFailed(streamSnapshotTimeoutException);
            }
        }
    }

    synchronized void clearOutstanding() {
        if (this.m_outstandingWork.isEmpty() && this.m_outstandingWorkCount.get() == 0) {
            return;
        }
        rejoinLog.trace("Clearing outstanding work.");
        Iterator<Map.Entry<Integer, SendWork>> it = this.m_outstandingWork.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().discard();
        }
        this.m_outstandingWork.clear();
        this.m_outstandingWorkCount.set(0);
        notifyAll();
    }

    @Override // org.voltdb.rejoin.StreamSnapshotAckReceiver.AckCallback
    public synchronized void receiveAck(int i) {
        SendWork sendWork = this.m_outstandingWork.get(Integer.valueOf(i));
        if (sendWork == null || sendWork.m_ackCounter == null) {
            rejoinLog.warn("Received invalid blockIndex ack for targetId " + this.m_targetId + " for index " + String.valueOf(i) + (sendWork == null ? " already removed the block." : " ack counter haven't been initialized."));
            return;
        }
        if (!sendWork.receiveAck()) {
            rejoinLog.trace("Received ack for targetId " + this.m_targetId + " decrements counter for block index " + String.valueOf(i));
            return;
        }
        rejoinLog.trace("Received ack for targetId " + this.m_targetId + " removes block for index " + String.valueOf(i));
        if (this.m_outstandingWorkCount.decrementAndGet() == 0) {
            notifyAll();
        }
        this.m_outstandingWork.remove(Integer.valueOf(i));
        sendWork.discard();
    }

    @Override // org.voltdb.rejoin.StreamSnapshotAckReceiver.AckCallback
    public synchronized void receiveError(Exception exc) {
        setWriteFailed(exc);
    }

    @Override // org.voltdb.SnapshotDataTarget
    public int getHeaderSize() {
        return 9;
    }

    @Override // org.voltdb.SnapshotDataTarget
    public ListenableFuture<?> write(Callable<DBBPool.BBContainer> callable, int i) {
        synchronized (this) {
            rejoinLog.trace("Starting write");
            try {
                try {
                    DBBPool.BBContainer call = callable.call();
                    ByteBuffer b = call.b();
                    if (this.m_writeFailed.get() != null || call == null) {
                        if (call != null) {
                            call.discard();
                        }
                        if (this.m_failureReported) {
                            rejoinLog.trace("Finished call to write");
                            return null;
                        }
                        this.m_failureReported = true;
                        ListenableFuture<?> immediateFailedFuture = Futures.immediateFailedFuture(this.m_writeFailed.get());
                        rejoinLog.trace("Finished call to write");
                        return immediateFailedFuture;
                    }
                    if (this.m_closed.get()) {
                        call.discard();
                        IOException iOException = new IOException("Trying to write snapshot data after the stream is closed");
                        setWriteFailed(iOException);
                        ListenableFuture<?> immediateFailedFuture2 = Futures.immediateFailedFuture(iOException);
                        rejoinLog.trace("Finished call to write");
                        return immediateFailedFuture2;
                    }
                    Pair<Boolean, byte[]> pair = this.m_schemas.get(Integer.valueOf(i));
                    if (pair.getSecond() != null) {
                        byte[] second = pair.getSecond();
                        this.m_schemas.put(Integer.valueOf(i), Pair.of(pair.getFirst(), null));
                        if (rejoinLog.isDebugEnabled()) {
                            rejoinLog.debug("Sending schema for table " + i);
                        }
                        send(StreamSnapshotMessageType.SCHEMA, i, second, pair.getFirst().booleanValue());
                    }
                    b.put((byte) StreamSnapshotMessageType.DATA.ordinal());
                    b.putInt(this.m_blockIndex);
                    b.putInt(i);
                    b.position(0);
                    StreamSnapshotMessageType streamSnapshotMessageType = StreamSnapshotMessageType.DATA;
                    int i2 = this.m_blockIndex;
                    this.m_blockIndex = i2 + 1;
                    ListenableFuture<Boolean> send = send(streamSnapshotMessageType, i2, call, pair.getFirst().booleanValue());
                    rejoinLog.trace("Finished call to write");
                    return send;
                } catch (Throwable th) {
                    rejoinLog.trace("Finished call to write");
                    throw th;
                }
            } catch (Exception e) {
                setWriteFailed(e);
                ListenableFuture<?> immediateFailedFuture3 = Futures.immediateFailedFuture(e);
                rejoinLog.trace("Finished call to write");
                return immediateFailedFuture3;
            }
        }
    }

    private synchronized ListenableFuture<Boolean> send(StreamSnapshotMessageType streamSnapshotMessageType, int i, byte[] bArr, boolean z) {
        ByteBuffer allocate = ByteBuffer.allocate(9 + bArr.length);
        allocate.put((byte) streamSnapshotMessageType.ordinal());
        allocate.putInt(this.m_blockIndex);
        allocate.putInt(i);
        allocate.put(bArr);
        allocate.flip();
        int i2 = this.m_blockIndex;
        this.m_blockIndex = i2 + 1;
        return send(streamSnapshotMessageType, i2, DBBPool.wrapBB(allocate), z);
    }

    synchronized ListenableFuture<Boolean> send(StreamSnapshotMessageType streamSnapshotMessageType, int i, DBBPool.BBContainer bBContainer, boolean z) {
        SettableFuture create = SettableFuture.create();
        if (rejoinLog.isTraceEnabled()) {
            rejoinLog.trace("Sending block " + i + " of type " + (z ? "REPLICATED " : "PARTITIONED ") + streamSnapshotMessageType.name() + " from targetId " + this.m_targetId + " site " + CoreUtils.hsIdToString(this.m_srcHSId) + " to " + CoreUtils.hsIdToString(this.m_destHSId) + (z ? ", " + CoreUtils.hsIdCollectionToString(this.m_otherDestHostHSIds) : ""));
        }
        SendWork sendWork = new SendWork(streamSnapshotMessageType, this.m_targetId, this.m_destHSId, z ? this.m_otherDestHostHSIds : null, bBContainer, create);
        this.m_outstandingWork.put(Integer.valueOf(i), sendWork);
        this.m_outstandingWorkCount.incrementAndGet();
        this.m_sender.offer(sendWork);
        return create;
    }

    @Override // org.voltdb.SnapshotDataTarget
    public void reportSerializationFailure(IOException iOException) {
        this.m_reportedSerializationFailure = iOException;
    }

    @Override // org.voltdb.SnapshotDataTarget
    public Exception getSerializationException() {
        return this.m_reportedSerializationFailure;
    }

    @Override // org.voltdb.SnapshotDataTarget
    public boolean needsFinalClose() {
        return true;
    }

    @Override // org.voltdb.SnapshotDataTarget
    public void close() throws IOException, InterruptedException {
        if (!this.m_closed.get()) {
            rejoinLog.trace("Closing stream snapshot target " + this.m_targetId);
            waitForOutstandingWork();
            sendEOS();
            this.m_sender.offer(new SendWork());
            synchronized (this) {
                this.m_closed.set(true);
                if (!$assertionsDisabled && this.m_outstandingWork.size() != 0) {
                    throw new AssertionError();
                }
            }
            rejoinLog.trace("Closed stream snapshot target " + this.m_targetId);
        }
        Runnable runnable = this.m_onCloseHandler.get();
        if (runnable != null) {
            runnable.run();
        }
        if (this.m_reportedSerializationFailure != null) {
            throw this.m_reportedSerializationFailure;
        }
        Exception exc = this.m_writeFailed.get();
        if (exc != null) {
            Throwables.propagateIfPossible(exc, IOException.class);
            throw new IOException(exc);
        }
    }

    private void sendEOS() {
        ByteBuffer allocate = ByteBuffer.allocate(5);
        if (this.m_writeFailed.get() != null) {
            allocate.put((byte) StreamSnapshotMessageType.FAILURE.ordinal());
        } else {
            allocate.put((byte) StreamSnapshotMessageType.END.ordinal());
        }
        allocate.putInt(this.m_blockIndex);
        allocate.flip();
        StreamSnapshotMessageType streamSnapshotMessageType = StreamSnapshotMessageType.END;
        int i = this.m_blockIndex;
        this.m_blockIndex = i + 1;
        send(streamSnapshotMessageType, i, DBBPool.wrapBB(allocate), this.m_replicatedTableTarget);
        waitForOutstandingWork();
    }

    private synchronized void waitForOutstandingWork() {
        boolean z = false;
        while (this.m_writeFailed.get() == null && this.m_outstandingWorkCount.get() > 0 && !this.m_ackReceiver.isStopped()) {
            try {
                wait();
            } catch (InterruptedException e) {
                z = true;
            }
        }
        if (z) {
            Thread.currentThread().interrupt();
        }
        clearOutstanding();
    }

    @Override // org.voltdb.SnapshotDataTarget
    public long getBytesWritten() {
        return this.m_sender.m_bytesSent.get(Long.valueOf(this.m_targetId)).get();
    }

    public long getWorksWritten() {
        return this.m_sender.m_worksSent.get(Long.valueOf(this.m_targetId)).get();
    }

    @Override // org.voltdb.SnapshotDataTarget
    public void setOnCloseHandler(Runnable runnable) {
        this.m_onCloseHandler.set(runnable);
    }

    @Override // org.voltdb.SnapshotDataTarget
    public synchronized Exception getLastWriteException() {
        Exception exc = this.m_sender.m_lastException;
        return exc != null ? exc : this.m_writeFailed.get();
    }

    @Override // org.voltdb.SnapshotDataTarget
    public SnapshotFormat getFormat() {
        return SnapshotFormat.STREAM;
    }

    @Override // org.voltdb.SnapshotDataTarget
    public int getInContainerRowCount(DBBPool.BBContainer bBContainer) {
        ByteBuffer duplicate = bBContainer.b().duplicate();
        duplicate.position(getHeaderSize());
        duplicate.getInt();
        return duplicate.getInt();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void setWriteFailed(Exception exc) {
        this.m_ackReceiver.forceStop();
        if (this.m_writeFailed.compareAndSet(null, exc)) {
            notifyAll();
        }
    }

    protected byte[] getSchema(int i) {
        return this.m_schemas.get(Integer.valueOf(i)).getSecond();
    }

    @Override // org.voltdb.SnapshotDataTarget
    public void setInProgressHandler(Runnable runnable) {
        this.m_progressHandler = runnable;
    }

    @Override // org.voltdb.SnapshotDataTarget
    public void trackProgress() {
        this.m_progressHandler.run();
    }

    static {
        $assertionsDisabled = !StreamSnapshotDataTarget.class.desiredAssertionStatus();
        rejoinLog = new VoltLogger("REJOIN");
        m_rejoinDeathTestMode = System.getProperties().containsKey("rejoindeathtest");
        m_totalSnapshotTargetCount = new AtomicLong(0L);
        DEFAULT_WRITE_TIMEOUT_MS = m_rejoinDeathTestMode ? 10000L : Long.getLong("REJOIN_WRITE_TIMEOUT_MS", EstTimeUpdater.maxErrorReportInterval).longValue();
    }
}
