package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderator.class */
public class StreamAppenderator implements Appenderator {
    public static final int ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER = 1000;
    public static final int ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER = 700;
    public static final int ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER = 600;
    public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
    public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
    private static final EmittingLogger log = new EmittingLogger(StreamAppenderator.class);
    private static final int WARN_DELAY = 1000;
    private static final String IDENTIFIER_FILE_NAME = "identifier.json";
    private final String myId;
    private final DataSchema schema;
    private final AppenderatorConfig tuningConfig;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentPusher dataSegmentPusher;
    private final ObjectMapper objectMapper;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final IndexIO indexIO;
    private final IndexMerger indexMerger;
    private final Cache cache;
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
    private final long maxBytesTuningConfig;
    private final boolean skipBytesInMemoryOverheadCheck;
    private final QuerySegmentWalker texasRanger;
    private final RowIngestionMeters rowIngestionMeters;
    private final ParseExceptionHandler parseExceptionHandler;
    private volatile long nextFlush;
    private volatile Throwable persistError;
    private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap();
    private final Set<SegmentIdWithShardSpec> droppingSinks = Sets.newConcurrentHashSet();
    private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
    private final AtomicInteger totalRows = new AtomicInteger();
    private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
    private final Lock commitLock = new ReentrantLock();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private volatile ListeningExecutorService persistExecutor = null;
    private volatile ListeningExecutorService pushExecutor = null;
    private volatile ListeningExecutorService intermediateTempExecutor = null;
    private volatile FileLock basePersistDirLock = null;
    private volatile FileChannel basePersistDirLockChannel = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamAppenderator(String str, DataSchema dataSchema, AppenderatorConfig appenderatorConfig, FireDepartmentMetrics fireDepartmentMetrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, DataSegmentAnnouncer dataSegmentAnnouncer, @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker, IndexIO indexIO, IndexMerger indexMerger, Cache cache, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler) {
        this.myId = str;
        this.schema = (DataSchema) Preconditions.checkNotNull(dataSchema, "schema");
        this.tuningConfig = (AppenderatorConfig) Preconditions.checkNotNull(appenderatorConfig, "tuningConfig");
        this.metrics = (FireDepartmentMetrics) Preconditions.checkNotNull(fireDepartmentMetrics, "metrics");
        this.dataSegmentPusher = (DataSegmentPusher) Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher");
        this.objectMapper = (ObjectMapper) Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.segmentAnnouncer = (DataSegmentAnnouncer) Preconditions.checkNotNull(dataSegmentAnnouncer, "segmentAnnouncer");
        this.indexIO = (IndexIO) Preconditions.checkNotNull(indexIO, "indexIO");
        this.indexMerger = (IndexMerger) Preconditions.checkNotNull(indexMerger, "indexMerger");
        this.cache = cache;
        this.texasRanger = sinkQuerySegmentWalker;
        this.rowIngestionMeters = (RowIngestionMeters) Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
        this.parseExceptionHandler = (ParseExceptionHandler) Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
        if (sinkQuerySegmentWalker == null) {
            this.sinkTimeline = new VersionedIntervalTimeline<>(String.CASE_INSENSITIVE_ORDER);
        } else {
            this.sinkTimeline = sinkQuerySegmentWalker.getSinkTimeline();
        }
        this.maxBytesTuningConfig = appenderatorConfig.getMaxBytesInMemoryOrDefault();
        this.skipBytesInMemoryOverheadCheck = appenderatorConfig.isSkipBytesInMemoryOverheadCheck();
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public String getId() {
        return this.myId;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public String getDataSource() {
        return this.schema.getDataSource();
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public Object startJob() {
        this.tuningConfig.getBasePersistDirectory().mkdirs();
        lockBasePersistDirectory();
        Object bootstrapSinksFromDisk = bootstrapSinksFromDisk();
        initializeExecutors();
        resetNextFlush();
        return bootstrapSinksFromDisk;
    }

    private void throwPersistErrorIfExists() {
        if (this.persistError != null) {
            throw new RE(this.persistError, "Error while persisting", new Object[0]);
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public Appenderator.AppenderatorAddResult add(SegmentIdWithShardSpec segmentIdWithShardSpec, InputRow inputRow, @Nullable Supplier<Committer> supplier, boolean z) throws IndexSizeExceededException, SegmentNotWritableException {
        throwPersistErrorIfExists();
        if (!segmentIdWithShardSpec.getDataSource().equals(this.schema.getDataSource())) {
            throw new IAE("Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", new Object[]{this.schema.getDataSource(), segmentIdWithShardSpec.getDataSource()});
        }
        Sink orCreateSink = getOrCreateSink(segmentIdWithShardSpec);
        this.metrics.reportMessageMaxTimestamp(inputRow.getTimestampFromEpoch());
        int numRowsInMemory = orCreateSink.getNumRowsInMemory();
        long bytesInMemory = orCreateSink.getBytesInMemory();
        try {
            IncrementalIndexAddResult add = orCreateSink.add(inputRow, !z);
            int rowCount = add.getRowCount();
            long bytesInMemory2 = add.getBytesInMemory();
            if (rowCount < 0) {
                throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", segmentIdWithShardSpec);
            }
            if (add.isRowAdded()) {
                this.rowIngestionMeters.incrementProcessed();
            } else if (add.hasParseException()) {
                this.parseExceptionHandler.handle(add.getParseException());
            }
            int i = rowCount - numRowsInMemory;
            this.rowsCurrentlyInMemory.addAndGet(i);
            this.bytesCurrentlyInMemory.addAndGet(bytesInMemory2 - bytesInMemory);
            this.totalRows.addAndGet(i);
            boolean z2 = false;
            boolean z3 = false;
            ArrayList arrayList = new ArrayList();
            if (!orCreateSink.canAppendRow()) {
                z3 = true;
                arrayList.add("No more rows can be appended to sink");
            }
            if (System.currentTimeMillis() > this.nextFlush) {
                z3 = true;
                arrayList.add(StringUtils.format("current time[%d] is greater than nextFlush[%d]", new Object[]{Long.valueOf(System.currentTimeMillis()), Long.valueOf(this.nextFlush)}));
            }
            if (this.rowsCurrentlyInMemory.get() >= this.tuningConfig.getMaxRowsInMemory()) {
                z3 = true;
                arrayList.add(StringUtils.format("rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]", new Object[]{Integer.valueOf(this.rowsCurrentlyInMemory.get()), Integer.valueOf(this.tuningConfig.getMaxRowsInMemory())}));
            }
            if (this.bytesCurrentlyInMemory.get() >= this.maxBytesTuningConfig) {
                z3 = true;
                arrayList.add(StringUtils.format("(estimated) bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", new Object[]{Long.valueOf(this.bytesCurrentlyInMemory.get()), Long.valueOf(this.maxBytesTuningConfig)}));
            }
            if (z3) {
                if (z) {
                    log.info("Flushing in-memory data to disk because %s.", new Object[]{String.join(",", arrayList)});
                    long j = 0;
                    Iterator<Map.Entry<SegmentIdWithShardSpec, Sink>> it = this.sinks.entrySet().iterator();
                    while (it.hasNext()) {
                        Sink value = it.next().getValue();
                        if (value != null) {
                            j += value.getBytesInMemory();
                            if (value.swappable()) {
                                this.bytesCurrentlyInMemory.addAndGet(calculateMMappedHydrantMemoryInUsed(orCreateSink.getCurrHydrant()));
                            }
                        }
                    }
                    if (!this.skipBytesInMemoryOverheadCheck && this.bytesCurrentlyInMemory.get() - j > this.maxBytesTuningConfig) {
                        String format = StringUtils.format("Task has exceeded safe estimated heap usage limits, failing (numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])", new Object[]{Integer.valueOf(this.sinks.size()), Integer.valueOf(this.sinks.values().stream().mapToInt((v0) -> {
                            return Iterables.size(v0);
                        }).sum()), Integer.valueOf(getTotalRowCount()), Long.valueOf(this.bytesCurrentlyInMemory.get()), Long.valueOf(j), Long.valueOf(this.maxBytesTuningConfig)});
                        String format2 = StringUtils.format("%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to great to have enough space to process additional input rows. This check, along with metering the overhead of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting 'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an increase in heap footprint, but will allow for more intermediary segment persists to occur before reaching this condition.", new Object[]{format});
                        log.makeAlert(format, new Object[0]).addData("dataSource", this.schema.getDataSource()).emit();
                        throw new RuntimeException(format2);
                    }
                    Futures.addCallback(persistAll(supplier == null ? null : (Committer) supplier.get()), new FutureCallback<Object>() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderator.1
                        public void onSuccess(@Nullable Object obj) {
                        }

                        public void onFailure(Throwable th) {
                            StreamAppenderator.this.persistError = th;
                        }
                    });
                } else {
                    z2 = true;
                }
            }
            return new Appenderator.AppenderatorAddResult(segmentIdWithShardSpec, orCreateSink.getNumRows(), z2);
        } catch (IndexSizeExceededException e) {
            log.error(e, "Sink for segment[%s] was unexpectedly full!", new Object[]{segmentIdWithShardSpec});
            throw e;
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public List<SegmentIdWithShardSpec> getSegments() {
        return ImmutableList.copyOf(this.sinks.keySet());
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public int getRowCount(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        Sink sink = this.sinks.get(segmentIdWithShardSpec);
        if (sink == null) {
            throw new ISE("No such sink: %s", new Object[]{segmentIdWithShardSpec});
        }
        return sink.getNumRows();
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public int getTotalRowCount() {
        return this.totalRows.get();
    }

    @VisibleForTesting
    int getRowsInMemory() {
        return this.rowsCurrentlyInMemory.get();
    }

    @VisibleForTesting
    long getBytesCurrentlyInMemory() {
        return this.bytesCurrentlyInMemory.get();
    }

    @VisibleForTesting
    long getBytesInMemory(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        Sink sink = this.sinks.get(segmentIdWithShardSpec);
        if (sink == null) {
            throw new ISE("No such sink: %s", new Object[]{segmentIdWithShardSpec});
        }
        return sink.getBytesInMemory();
    }

    private Sink getOrCreateSink(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        Sink sink = this.sinks.get(segmentIdWithShardSpec);
        if (sink == null) {
            sink = new Sink(segmentIdWithShardSpec.getInterval(), this.schema, segmentIdWithShardSpec.getShardSpec(), segmentIdWithShardSpec.getVersion(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.maxBytesTuningConfig, null);
            this.bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed(sink));
            try {
                this.segmentAnnouncer.announceSegment(sink.getSegment());
            } catch (IOException e) {
                log.makeAlert(e, "Failed to announce new segment[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", sink.getInterval()).emit();
            }
            this.sinks.put(segmentIdWithShardSpec, sink);
            this.metrics.setSinkCount(this.sinks.size());
            this.sinkTimeline.add(sink.getInterval(), sink.getVersion(), segmentIdWithShardSpec.getShardSpec().createChunk(sink));
        }
        return sink;
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForIntervals(query, iterable);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> iterable) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForSegments(query, iterable);
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public void clear() throws InterruptedException {
        try {
            throwPersistErrorIfExists();
            if (this.persistExecutor != null) {
                this.persistExecutor.submit(() -> {
                    try {
                        this.commitLock.lock();
                        this.objectMapper.writeValue(computeCommitFile(), Committed.nil());
                        return null;
                    } finally {
                        this.commitLock.unlock();
                    }
                }).get();
                ArrayList arrayList = new ArrayList();
                for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : this.sinks.entrySet()) {
                    arrayList.add(abandonSegment(entry.getKey(), entry.getValue(), true));
                }
                Futures.allAsList(arrayList).get();
            }
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<?> drop(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        Sink sink = this.sinks.get(segmentIdWithShardSpec);
        return sink != null ? abandonSegment(segmentIdWithShardSpec, sink, true) : Futures.immediateFuture((Object) null);
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<Object> persistAll(@Nullable final Committer committer) {
        throwPersistErrorIfExists();
        final HashMap hashMap = new HashMap();
        final ArrayList arrayList = new ArrayList();
        int i = 0;
        long j = 0;
        final MutableLong mutableLong = new MutableLong();
        final MutableLong mutableLong2 = new MutableLong();
        final long size = this.sinks.size();
        for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : this.sinks.entrySet()) {
            SegmentIdWithShardSpec key = entry.getKey();
            Sink value = entry.getValue();
            if (value == null) {
                throw new ISE("No sink for identifier: %s", new Object[]{key});
            }
            ArrayList newArrayList = Lists.newArrayList(value);
            mutableLong.add(newArrayList.size());
            hashMap.put(key.toString(), Integer.valueOf(newArrayList.size()));
            i += value.getNumRowsInMemory();
            j += value.getBytesInMemory();
            for (FireHydrant fireHydrant : newArrayList.subList(0, value.isWritable() ? newArrayList.size() - 1 : newArrayList.size())) {
                if (!fireHydrant.hasSwapped()) {
                    log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", new Object[]{fireHydrant, key});
                    arrayList.add(Pair.of(fireHydrant, key));
                    mutableLong2.add(1L);
                }
            }
            if (value.swappable()) {
                arrayList.add(Pair.of(value.swap(), key));
                mutableLong2.add(1L);
            }
        }
        log.debug("Submitting persist runnable for dataSource[%s]", new Object[]{this.schema.getDataSource()});
        final Object metadata = committer == null ? null : committer.getMetadata();
        Stopwatch createStarted = Stopwatch.createStarted();
        final Stopwatch createStarted2 = Stopwatch.createStarted();
        final AtomicLong atomicLong = new AtomicLong(i);
        ListenableFuture<Object> submit = this.persistExecutor.submit(new Callable<Object>() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderator.2
            @Override // java.util.concurrent.Callable
            public Object call() throws IOException {
                try {
                    try {
                        for (Pair pair : arrayList) {
                            StreamAppenderator.this.metrics.incrementRowOutputCount(StreamAppenderator.this.persistHydrant((FireHydrant) pair.lhs, (SegmentIdWithShardSpec) pair.rhs));
                        }
                        if (committer != null) {
                            StreamAppenderator.log.debug("Committing metadata[%s] for sinks[%s].", new Object[]{metadata, Joiner.on(", ").join((Iterable) hashMap.entrySet().stream().map(entry2 -> {
                                return StringUtils.format("%s:%d", new Object[]{entry2.getKey(), entry2.getValue()});
                            }).collect(Collectors.toList()))});
                            committer.run();
                            try {
                                StreamAppenderator.this.commitLock.lock();
                                HashMap hashMap2 = new HashMap();
                                Committed readCommit = StreamAppenderator.this.readCommit();
                                if (readCommit != null) {
                                    hashMap2.putAll(readCommit.getHydrants());
                                }
                                hashMap2.putAll(hashMap);
                                StreamAppenderator.this.writeCommit(new Committed(hashMap2, metadata));
                                StreamAppenderator.this.commitLock.unlock();
                            } catch (Throwable th) {
                                StreamAppenderator.this.commitLock.unlock();
                                throw th;
                            }
                        }
                        StreamAppenderator.log.info("Flushed in-memory data with commit metadata [%s] for segments: %s", new Object[]{metadata, arrayList.stream().map(pair2 -> {
                            return ((SegmentIdWithShardSpec) pair2.rhs).asSegmentId().toString();
                        }).distinct().collect(Collectors.joining(", "))});
                        StreamAppenderator.log.info("Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]", new Object[]{Long.valueOf(StreamAppenderator.this.rowIngestionMeters.getProcessed()), Long.valueOf(atomicLong.get()), Long.valueOf(size), Long.valueOf(mutableLong.longValue()), Long.valueOf(mutableLong2.longValue())});
                        Object obj = metadata;
                        StreamAppenderator.this.metrics.incrementNumPersists();
                        StreamAppenderator.this.metrics.incrementPersistTimeMillis(createStarted2.elapsed(TimeUnit.MILLISECONDS));
                        createStarted2.stop();
                        return obj;
                    } catch (IOException e) {
                        StreamAppenderator.this.metrics.incrementFailedPersists();
                        throw e;
                    }
                } catch (Throwable th2) {
                    StreamAppenderator.this.metrics.incrementNumPersists();
                    StreamAppenderator.this.metrics.incrementPersistTimeMillis(createStarted2.elapsed(TimeUnit.MILLISECONDS));
                    createStarted2.stop();
                    throw th2;
                }
            }
        });
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(elapsed);
        if (elapsed > 1000) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", new Object[]{Long.valueOf(elapsed)});
        }
        createStarted.stop();
        resetNextFlush();
        this.rowsCurrentlyInMemory.addAndGet(-i);
        this.bytesCurrentlyInMemory.addAndGet(-j);
        log.info("Persisted rows[%,d] and (estimated) bytes[%,d]", new Object[]{Integer.valueOf(i), Long.valueOf(j)});
        return submit;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<SegmentsAndCommitMetadata> push(Collection<SegmentIdWithShardSpec> collection, @Nullable Committer committer, boolean z) {
        HashMap hashMap = new HashMap();
        AtomicLong atomicLong = new AtomicLong();
        for (SegmentIdWithShardSpec segmentIdWithShardSpec : collection) {
            Sink sink = this.sinks.get(segmentIdWithShardSpec);
            if (sink == null) {
                throw new ISE("No sink for identifier: %s", new Object[]{segmentIdWithShardSpec});
            }
            hashMap.put(segmentIdWithShardSpec, sink);
            if (sink.finishWriting()) {
                this.totalRows.addAndGet(-sink.getNumRows());
            }
            atomicLong.addAndGet(Iterables.size(sink));
        }
        return Futures.transform(persistAll(committer), obj -> {
            ArrayList arrayList = new ArrayList();
            log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]", new Object[]{Long.valueOf(this.rowIngestionMeters.getProcessed()), Integer.valueOf(hashMap.size()), Long.valueOf(atomicLong.get())});
            log.debug("Building and pushing segments: %s", new Object[]{hashMap.keySet().stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(", "))});
            for (Map.Entry entry : hashMap.entrySet()) {
                if (this.droppingSinks.contains(entry.getKey())) {
                    log.warn("Skipping push of currently-dropping sink[%s]", new Object[]{entry.getKey()});
                } else {
                    DataSegment mergeAndPush = mergeAndPush((SegmentIdWithShardSpec) entry.getKey(), (Sink) entry.getValue(), z);
                    if (mergeAndPush != null) {
                        arrayList.add(mergeAndPush);
                    } else {
                        log.warn("mergeAndPush[%s] returned null, skipping.", new Object[]{entry.getKey()});
                    }
                }
            }
            log.info("Push complete...", new Object[0]);
            return new SegmentsAndCommitMetadata(arrayList, obj);
        }, this.pushExecutor);
    }

    private ListenableFuture<?> pushBarrier() {
        return this.intermediateTempExecutor.submit(() -> {
            this.pushExecutor.submit(() -> {
            });
        });
    }

    @Nullable
    private DataSegment mergeAndPush(SegmentIdWithShardSpec segmentIdWithShardSpec, Sink sink, boolean z) {
        if (this.sinks.get(segmentIdWithShardSpec) != sink) {
            log.warn("Sink for segment[%s] no longer valid, bailing out of mergeAndPush.", new Object[]{segmentIdWithShardSpec});
            return null;
        }
        File file = new File(computePersistDir(segmentIdWithShardSpec), "merged");
        File computeDescriptorFile = computeDescriptorFile(segmentIdWithShardSpec);
        Iterator<FireHydrant> it = sink.iterator();
        while (it.hasNext()) {
            FireHydrant next = it.next();
            if (sink.isWritable()) {
                throw new ISE("Expected sink to be no longer writable before mergeAndPush for segment[%s].", new Object[]{segmentIdWithShardSpec});
            }
            synchronized (next) {
                if (!next.hasSwapped()) {
                    throw new ISE("Expected sink to be fully persisted before mergeAndPush for segment[%s].", new Object[]{segmentIdWithShardSpec});
                }
            }
        }
        try {
            if (computeDescriptorFile.exists()) {
                if (!z) {
                    log.info("Segment[%s] already pushed, skipping.", new Object[]{segmentIdWithShardSpec});
                    return (DataSegment) this.objectMapper.readValue(computeDescriptorFile, DataSegment.class);
                }
                log.debug("Segment[%s] already pushed, but we want a unique path, so will push again with a new path.", new Object[]{segmentIdWithShardSpec});
            }
            removeDirectory(file);
            if (file.exists()) {
                throw new ISE("Merged target[%s] exists after removing?!", new Object[]{file});
            }
            long nanoTime = System.nanoTime();
            ArrayList arrayList = new ArrayList();
            Closer create = Closer.create();
            try {
                try {
                    Iterator<FireHydrant> it2 = sink.iterator();
                    while (it2.hasNext()) {
                        FireHydrant next2 = it2.next();
                        Pair<ReferenceCountingSegment, Closeable> andIncrementSegment = next2.getAndIncrementSegment();
                        QueryableIndex asQueryableIndex = ((ReferenceCountingSegment) andIncrementSegment.lhs).asQueryableIndex();
                        log.debug("Segment[%s] adding hydrant[%s]", new Object[]{segmentIdWithShardSpec, next2});
                        arrayList.add(asQueryableIndex);
                        create.register((Closeable) andIncrementSegment.rhs);
                    }
                    File mergeQueryableIndex = this.indexMerger.mergeQueryableIndex(arrayList, this.schema.getGranularitySpec().isRollup(), this.schema.getAggregators(), this.schema.getDimensionsSpec(), file, this.tuningConfig.getIndexSpec(), this.tuningConfig.getSegmentWriteOutMediumFactory(), this.tuningConfig.getMaxColumnsToMerge());
                    long nanoTime2 = System.nanoTime();
                    log.debug("Segment[%s] built in %,dms.", new Object[]{segmentIdWithShardSpec, Long.valueOf((nanoTime2 - nanoTime) / 1000000)});
                    create.close();
                    DataSegment withDimensions = sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(arrayList, this.schema.getDimensionsSpec()));
                    DataSegment dataSegment = (DataSegment) RetryUtils.retry(() -> {
                        return this.dataSegmentPusher.push(mergeQueryableIndex, withDimensions, z);
                    }, th -> {
                        return th instanceof Exception;
                    }, 5);
                    long nanoTime3 = System.nanoTime();
                    this.objectMapper.writeValue(computeDescriptorFile, dataSegment);
                    log.info("Segment[%s] of %,d bytes built from %d incremental persist(s) in %,dms; pushed to deep storage in %,dms. Load spec is: %s", new Object[]{segmentIdWithShardSpec, Long.valueOf(dataSegment.getSize()), Integer.valueOf(arrayList.size()), Long.valueOf((nanoTime2 - nanoTime) / 1000000), Long.valueOf((nanoTime3 - nanoTime2) / 1000000), this.objectMapper.writeValueAsString(dataSegment.getLoadSpec())});
                    return dataSegment;
                } catch (Throwable th2) {
                    create.close();
                    throw th2;
                }
            } catch (Throwable th3) {
                throw create.rethrow(th3);
            }
        } catch (Exception e) {
            this.metrics.incrementFailedHandoffs();
            log.warn(e, "Failed to push merged index for segment[%s].", new Object[]{segmentIdWithShardSpec});
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            log.debug("Appenderator already closed, skipping close() call.", new Object[0]);
            return;
        }
        log.debug("Shutting down...", new Object[0]);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : this.sinks.entrySet()) {
            arrayList.add(abandonSegment(entry.getKey(), entry.getValue(), false));
        }
        try {
            Futures.allAsList(arrayList).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn(e, "Interrupted during close()", new Object[0]);
        } catch (ExecutionException e2) {
            log.warn(e2, "Unable to abandon existing segments during close()", new Object[0]);
        }
        try {
            shutdownExecutors();
            Preconditions.checkState(this.persistExecutor == null || this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS), "persistExecutor not terminated");
            Preconditions.checkState(this.pushExecutor == null || this.pushExecutor.awaitTermination(365L, TimeUnit.DAYS), "pushExecutor not terminated");
            Preconditions.checkState(this.intermediateTempExecutor == null || this.intermediateTempExecutor.awaitTermination(365L, TimeUnit.DAYS), "intermediateTempExecutor not terminated");
            this.persistExecutor = null;
            this.pushExecutor = null;
            this.intermediateTempExecutor = null;
            unlockBasePersistDirectory();
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public void closeNow() {
        if (!this.closed.compareAndSet(false, true)) {
            log.debug("Appenderator already closed, skipping closeNow() call.", new Object[0]);
            return;
        }
        log.debug("Shutting down immediately...", new Object[0]);
        for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : this.sinks.entrySet()) {
            try {
                this.segmentAnnouncer.unannounceSegment(entry.getValue().getSegment());
            } catch (Exception e) {
                log.makeAlert(e, "Failed to unannounce segment[%s]", new Object[]{this.schema.getDataSource()}).addData("identifier", entry.getKey().toString()).emit();
            }
        }
        try {
            shutdownExecutors();
            Preconditions.checkState(this.persistExecutor == null || this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS), "persistExecutor not terminated");
            Preconditions.checkState(this.intermediateTempExecutor == null || this.intermediateTempExecutor.awaitTermination(365L, TimeUnit.DAYS), "intermediateTempExecutor not terminated");
            this.persistExecutor = null;
            this.intermediateTempExecutor = null;
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
    }

    private void lockBasePersistDirectory() {
        if (this.basePersistDirLock == null) {
            try {
                this.basePersistDirLockChannel = FileChannel.open(computeLockFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
                this.basePersistDirLock = this.basePersistDirLockChannel.tryLock();
                if (this.basePersistDirLock == null) {
                    throw new ISE("Cannot acquire lock on basePersistDir: %s", new Object[]{computeLockFile()});
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void unlockBasePersistDirectory() {
        try {
            if (this.basePersistDirLock != null) {
                this.basePersistDirLock.release();
                this.basePersistDirLockChannel.close();
                this.basePersistDirLock = null;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initializeExecutors() {
        int maxPendingPersists = this.tuningConfig.getMaxPendingPersists();
        if (this.persistExecutor == null) {
            this.persistExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(this.myId) + "]-appenderator-persist", maxPendingPersists));
        }
        if (this.pushExecutor == null) {
            this.pushExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(this.myId) + "]-appenderator-merge", 1));
        }
        if (this.intermediateTempExecutor == null) {
            this.intermediateTempExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(this.myId) + "]-appenderator-abandon", 0));
        }
    }

    private void shutdownExecutors() {
        if (this.persistExecutor != null) {
            this.persistExecutor.shutdownNow();
        }
        if (this.pushExecutor != null) {
            this.pushExecutor.shutdownNow();
        }
        if (this.intermediateTempExecutor != null) {
            this.intermediateTempExecutor.shutdownNow();
        }
    }

    private void resetNextFlush() {
        this.nextFlush = DateTimes.nowUtc().plus(this.tuningConfig.getIntermediatePersistPeriod()).getMillis();
    }

    private Object bootstrapSinksFromDisk() {
        File[] listFiles;
        Preconditions.checkState(this.sinks.isEmpty(), "Already bootstrapped?!");
        File basePersistDirectory = this.tuningConfig.getBasePersistDirectory();
        if (!basePersistDirectory.exists() || (listFiles = basePersistDirectory.listFiles()) == null) {
            return null;
        }
        try {
            try {
                this.commitLock.lock();
                File computeCommitFile = computeCommitFile();
                Committed nil = computeCommitFile.exists() ? (Committed) this.objectMapper.readValue(computeCommitFile, Committed.class) : Committed.nil();
                int i = 0;
                if (nil.equals(Committed.nil())) {
                    log.debug("No previously committed metadata.", new Object[0]);
                } else {
                    log.info("Loading partially-persisted segments[%s] from[%s] with commit metadata: %s", new Object[]{String.join(", ", (Iterable<? extends CharSequence>) nil.getHydrants().keySet()), basePersistDirectory, nil.getMetadata()});
                }
                for (File file : listFiles) {
                    if (new File(file, IDENTIFIER_FILE_NAME).isFile()) {
                        try {
                            SegmentIdWithShardSpec segmentIdWithShardSpec = (SegmentIdWithShardSpec) this.objectMapper.readValue(new File(file, IDENTIFIER_FILE_NAME), SegmentIdWithShardSpec.class);
                            int committedHydrants = nil.getCommittedHydrants(segmentIdWithShardSpec.toString());
                            if (committedHydrants <= 0) {
                                log.info("Removing uncommitted segment at [%s].", new Object[]{file});
                                FileUtils.deleteDirectory(file);
                            } else {
                                File[] listFiles2 = file.listFiles((file2, str) -> {
                                    return Ints.tryParse(str) != null;
                                });
                                Arrays.sort(listFiles2, (file3, file4) -> {
                                    return Ints.compare(Integer.parseInt(file3.getName()), Integer.parseInt(file4.getName()));
                                });
                                ArrayList arrayList = new ArrayList();
                                for (File file5 : listFiles2) {
                                    int parseInt = Integer.parseInt(file5.getName());
                                    if (parseInt >= committedHydrants) {
                                        log.info("Removing uncommitted partial segment at [%s]", new Object[]{file5});
                                        FileUtils.deleteDirectory(file5);
                                    } else {
                                        log.debug("Loading previously persisted partial segment at [%s]", new Object[]{file5});
                                        if (parseInt != arrayList.size()) {
                                            throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", new Object[]{Integer.valueOf(arrayList.size()), file});
                                        }
                                        arrayList.add(new FireHydrant(new QueryableIndexSegment(this.indexIO.loadIndex(file5), segmentIdWithShardSpec.asSegmentId()), parseInt));
                                    }
                                }
                                if (committedHydrants != arrayList.size()) {
                                    throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", new Object[]{Integer.valueOf(arrayList.size()), file});
                                }
                                Sink sink = new Sink(segmentIdWithShardSpec.getInterval(), this.schema, segmentIdWithShardSpec.getShardSpec(), segmentIdWithShardSpec.getVersion(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.maxBytesTuningConfig, null, arrayList);
                                i += sink.getNumRows();
                                this.sinks.put(segmentIdWithShardSpec, sink);
                                this.sinkTimeline.add(sink.getInterval(), sink.getVersion(), segmentIdWithShardSpec.getShardSpec().createChunk(sink));
                                this.segmentAnnouncer.announceSegment(sink.getSegment());
                            }
                        } catch (IOException e) {
                            log.makeAlert(e, "Problem loading sink[%s] from disk.", new Object[]{this.schema.getDataSource()}).addData("sinkDir", file).emit();
                        }
                    }
                }
                Sets.SetView difference = Sets.difference(nil.getHydrants().keySet(), Sets.newHashSet(Iterables.transform(this.sinks.keySet(), (v0) -> {
                    return v0.toString();
                })));
                if (!difference.isEmpty()) {
                    throw new ISE("Missing committed sinks [%s]", new Object[]{Joiner.on(", ").join(difference)});
                }
                this.totalRows.set(i);
                return nil.getMetadata();
            } catch (Exception e2) {
                throw new ISE(e2, "Failed to read commitFile: %s", new Object[]{null});
            }
        } finally {
            this.commitLock.unlock();
        }
    }

    private ListenableFuture<?> abandonSegment(final SegmentIdWithShardSpec segmentIdWithShardSpec, final Sink sink, final boolean z) {
        if (sink.finishWriting()) {
            this.rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
            this.bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory());
            this.bytesCurrentlyInMemory.addAndGet(-calculateSinkMemoryInUsed(sink));
            Iterator<FireHydrant> it = sink.iterator();
            while (it.hasNext()) {
                if (!it.next().equals(sink.getCurrHydrant())) {
                    this.bytesCurrentlyInMemory.addAndGet(-calculateMMappedHydrantMemoryInUsed(r0));
                }
            }
            this.totalRows.addAndGet(-sink.getNumRows());
        }
        this.droppingSinks.add(segmentIdWithShardSpec);
        return Futures.transform(pushBarrier(), new Function<Object, Void>() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderator.3
            @Nullable
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Void m166apply(@Nullable Object obj) {
                if (!StreamAppenderator.this.sinks.remove(segmentIdWithShardSpec, sink)) {
                    StreamAppenderator.log.error("Sink for segment[%s] no longer valid, not abandoning.", new Object[]{segmentIdWithShardSpec});
                    return null;
                }
                StreamAppenderator.this.metrics.setSinkCount(StreamAppenderator.this.sinks.size());
                if (z) {
                    StreamAppenderator.log.debug("Removing commit metadata for segment[%s].", new Object[]{segmentIdWithShardSpec});
                    try {
                        try {
                            StreamAppenderator.this.commitLock.lock();
                            Committed readCommit = StreamAppenderator.this.readCommit();
                            if (readCommit != null) {
                                StreamAppenderator.this.writeCommit(readCommit.without(segmentIdWithShardSpec.toString()));
                            }
                        } catch (Exception e) {
                            StreamAppenderator.log.makeAlert(e, "Failed to update committed segments[%s]", new Object[]{StreamAppenderator.this.schema.getDataSource()}).addData("identifier", segmentIdWithShardSpec.toString()).emit();
                            throw new RuntimeException(e);
                        }
                    } finally {
                        StreamAppenderator.this.commitLock.unlock();
                    }
                }
                try {
                    StreamAppenderator.this.segmentAnnouncer.unannounceSegment(sink.getSegment());
                } catch (Exception e2) {
                    StreamAppenderator.log.makeAlert(e2, "Failed to unannounce segment[%s]", new Object[]{StreamAppenderator.this.schema.getDataSource()}).addData("identifier", segmentIdWithShardSpec.toString()).emit();
                }
                StreamAppenderator.this.droppingSinks.remove(segmentIdWithShardSpec);
                StreamAppenderator.this.sinkTimeline.remove(sink.getInterval(), sink.getVersion(), segmentIdWithShardSpec.getShardSpec().createChunk(sink));
                Iterator<FireHydrant> it2 = sink.iterator();
                while (it2.hasNext()) {
                    FireHydrant next = it2.next();
                    if (StreamAppenderator.this.cache != null) {
                        StreamAppenderator.this.cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(next));
                    }
                    next.swapSegment(null);
                }
                if (z) {
                    StreamAppenderator.this.removeDirectory(StreamAppenderator.this.computePersistDir(segmentIdWithShardSpec));
                }
                StreamAppenderator.log.info("Dropped segment[%s].", new Object[]{segmentIdWithShardSpec});
                return null;
            }
        }, this.persistExecutor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Committed readCommit() throws IOException {
        File computeCommitFile = computeCommitFile();
        if (computeCommitFile.exists()) {
            return (Committed) this.objectMapper.readValue(computeCommitFile, Committed.class);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeCommit(Committed committed) throws IOException {
        this.objectMapper.writeValue(computeCommitFile(), committed);
    }

    private File computeCommitFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), "commit.json");
    }

    private File computeLockFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), ".lock");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public File computePersistDir(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        return new File(this.tuningConfig.getBasePersistDirectory(), segmentIdWithShardSpec.toString());
    }

    private File computeIdentifierFile(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        return new File(computePersistDir(segmentIdWithShardSpec), IDENTIFIER_FILE_NAME);
    }

    private File computeDescriptorFile(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        return new File(computePersistDir(segmentIdWithShardSpec), "descriptor.json");
    }

    private File createPersistDirIfNeeded(SegmentIdWithShardSpec segmentIdWithShardSpec) throws IOException {
        File computePersistDir = computePersistDir(segmentIdWithShardSpec);
        org.apache.commons.io.FileUtils.forceMkdir(computePersistDir);
        this.objectMapper.writeValue(computeIdentifierFile(segmentIdWithShardSpec), segmentIdWithShardSpec);
        return computePersistDir;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int persistHydrant(FireHydrant fireHydrant, SegmentIdWithShardSpec segmentIdWithShardSpec) {
        synchronized (fireHydrant) {
            if (fireHydrant.hasSwapped()) {
                log.info("Segment[%s] hydrant[%s] already swapped. Ignoring request to persist.", new Object[]{segmentIdWithShardSpec, fireHydrant});
                return 0;
            }
            log.debug("Segment[%s], persisting Hydrant[%s]", new Object[]{segmentIdWithShardSpec, fireHydrant});
            try {
                long nanoTime = System.nanoTime();
                int size = fireHydrant.getIndex().size();
                File persist = this.indexMerger.persist(fireHydrant.getIndex(), segmentIdWithShardSpec.getInterval(), new File(createPersistDirIfNeeded(segmentIdWithShardSpec), String.valueOf(fireHydrant.getCount())), this.tuningConfig.getIndexSpecForIntermediatePersists(), this.tuningConfig.getSegmentWriteOutMediumFactory());
                log.info("Flushed in-memory data for segment[%s] spill[%s] to disk in [%,d] ms (%,d rows).", new Object[]{fireHydrant.getSegmentId(), Integer.valueOf(fireHydrant.getCount()), Long.valueOf((System.nanoTime() - nanoTime) / 1000000), Integer.valueOf(size)});
                fireHydrant.swapSegment(new QueryableIndexSegment(this.indexIO.loadIndex(persist), fireHydrant.getSegmentId()));
                return size;
            } catch (IOException e) {
                log.makeAlert("Incremental persist failed", new Object[0]).addData("segment", segmentIdWithShardSpec.toString()).addData("dataSource", this.schema.getDataSource()).addData("count", Integer.valueOf(fireHydrant.getCount())).emit();
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeDirectory(File file) {
        if (file.exists()) {
            try {
                FileUtils.deleteDirectory(file);
            } catch (Exception e) {
                log.makeAlert(e, "Failed to remove directory[%s]", new Object[]{this.schema.getDataSource()}).addData(LocalFileTimestampVersionFinder.URI_SCHEME, file).emit();
            }
        }
    }

    private int calculateMMappedHydrantMemoryInUsed(FireHydrant fireHydrant) {
        if (this.skipBytesInMemoryOverheadCheck) {
            return 0;
        }
        return 1012 + (fireHydrant.getSegmentNumDimensionColumns() * 1000) + (fireHydrant.getSegmentNumMetricColumns() * 700) + 600;
    }

    private int calculateSinkMemoryInUsed(Sink sink) {
        return this.skipBytesInMemoryOverheadCheck ? 0 : 5000;
    }
}
