package org.voltdb.export;

import com.google_voltpatches.common.collect.ImmutableList;
import com.google_voltpatches.common.collect.Sets;
import com.google_voltpatches.common.util.concurrent.Futures;
import com.google_voltpatches.common.util.concurrent.ListenableFuture;
import com.google_voltpatches.common.util.concurrent.ListeningExecutorService;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper_voltpatches.AsyncCallback;
import org.apache.zookeeper_voltpatches.CreateMode;
import org.apache.zookeeper_voltpatches.KeeperException;
import org.apache.zookeeper_voltpatches.WatchedEvent;
import org.apache.zookeeper_voltpatches.Watcher;
import org.apache.zookeeper_voltpatches.ZooDefs;
import org.hsqldb_voltpatches.Tokens;
import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.BinaryPayloadMessage;
import org.voltcore.messaging.HostMessenger;
import org.voltcore.messaging.Mailbox;
import org.voltcore.messaging.VoltMessage;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.DBBPool;
import org.voltcore.utils.EstTime;
import org.voltcore.utils.Pair;
import org.voltcore.utils.RateLimitedLogger;
import org.voltcore.zk.ZKUtil;
import org.voltdb.CatalogContext;
import org.voltdb.ExportStatsBase;
import org.voltdb.RealVoltDB;
import org.voltdb.SnapshotCompletionMonitor;
import org.voltdb.TableType;
import org.voltdb.VoltDB;
import org.voltdb.VoltTable;
import org.voltdb.catalog.CatalogMap;
import org.voltdb.catalog.Connector;
import org.voltdb.catalog.Table;
import org.voltdb.common.Constants;
import org.voltdb.export.ExportDataSource;
import org.voltdb.exportclient.ExportClientBase;
import org.voltdb.messaging.LocalMailbox;
import org.voltdb.utils.CatalogUtil;
import org.voltdb.utils.PbdSegmentName;

/* loaded from: input_file:org/voltdb/export/ExportGeneration.class */
public class ExportGeneration implements Generation {
    public final File m_directory;
    private String m_mailboxesZKPath;
    private final HostMessenger m_messenger;
    public volatile int m_catalogVersion;
    private static final VoltLogger exportLog = new VoltLogger("EXPORT");
    private static final RateLimitedLogger exportLogLimited = new RateLimitedLogger(TimeUnit.MINUTES.toMillis(1), exportLog, Level.INFO);
    private static final RateLimitedLogger exportLogLimitedPush = new RateLimitedLogger(TimeUnit.MINUTES.toMillis(1), exportLog, Level.INFO);
    private static final ListeningExecutorService m_childUpdatingThread = CoreUtils.getListeningExecutorService("Export ZK Watcher", 1);
    private final Map<Integer, Map<String, ExportDataSource>> m_dataSourcesByPartition = new HashMap();
    private Map<Integer, ImmutableList<Long>> m_replicasHSIds = new HashMap();
    private Mailbox m_mbox = null;
    private volatile boolean m_shutdown = false;
    private Set<Integer> m_removingPartitions = ConcurrentHashMap.newKeySet();

    public ExportGeneration(File file, HostMessenger hostMessenger) throws IOException {
        this.m_directory = file;
        this.m_messenger = hostMessenger;
        if (!this.m_directory.canWrite() && !this.m_directory.mkdirs()) {
            throw new IOException("Could not create " + this.m_directory);
        }
        if (exportLog.isDebugEnabled()) {
            exportLog.debug("Creating new export generation.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize(int i, CatalogContext catalogContext, CatalogMap<Connector> catalogMap, ExportDataProcessor exportDataProcessor, Map<Integer, Integer> map, File file) {
        File[] listFiles = file.listFiles();
        if (listFiles != null) {
            initializeGenerationFromDisk(catalogMap, exportDataProcessor, listFiles, map, catalogContext.m_genId);
        }
        initializeGenerationFromCatalog(catalogContext, catalogMap, exportDataProcessor, i, map, false);
    }

    private void initializeGenerationFromDisk(CatalogMap<Connector> catalogMap, ExportDataProcessor exportDataProcessor, File[] fileArr, Map<Integer, Integer> map, long j) {
        ArrayList arrayList = new ArrayList();
        NavigableSet<Table> exportTablesExcludeViewOnly = CatalogUtil.getExportTablesExcludeViewOnly(catalogMap);
        HashSet hashSet = new HashSet();
        Iterator<Table> it = exportTablesExcludeViewOnly.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getTypeName());
        }
        HashMap hashMap = new HashMap();
        for (File file : fileArr) {
            if (file.getName().endsWith(".pbd")) {
                PbdSegmentName parseFile = PbdSegmentName.parseFile(exportLog, file);
                if (parseFile.m_nonce != null) {
                    String str = parseFile.m_nonce;
                    if (hashSet.contains(getStreamNameFromNonce(str))) {
                        hashMap.put(str, file);
                    } else {
                        file.delete();
                    }
                } else if (parseFile.m_result == PbdSegmentName.Result.NOT_PBD) {
                    exportLog.warn(file.getAbsolutePath() + " is not a PBD file.");
                } else if (parseFile.m_result == PbdSegmentName.Result.INVALID_NAME) {
                    exportLog.warn(file.getAbsolutePath() + " doesn't have valid PBD name.");
                }
            }
        }
        for (File file2 : fileArr) {
            if (file2.getName().endsWith(".ad")) {
                if (((File) hashMap.get(getNonceFromAdFile(file2))) != null) {
                    try {
                        addDataSource(file2, map, arrayList, exportDataProcessor, j);
                    } catch (IOException e) {
                        VoltDB.crashLocalVoltDB("Error intializing export datasource " + file2, true, e);
                    }
                } else {
                    file2.delete();
                }
            }
        }
        Set<Integer> keySet = map.keySet();
        HashSet hashSet2 = new HashSet(arrayList);
        hashSet2.removeAll(keySet);
        if (hashSet2.isEmpty()) {
            return;
        }
        createAckMailboxesIfNeeded(hashSet2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initializeGenerationFromCatalog(CatalogContext catalogContext, CatalogMap<Connector> catalogMap, ExportDataProcessor exportDataProcessor, int i, Map<Integer, Integer> map, boolean z) {
        this.m_catalogVersion = catalogContext.catalogVersion;
        if (exportLog.isDebugEnabled()) {
            exportLog.debug("Updating to catalog version : " + this.m_catalogVersion);
        }
        HashSet hashSet = new HashSet();
        synchronized (this.m_dataSourcesByPartition) {
            Iterator<Map<String, ExportDataSource>> it = this.m_dataSourcesByPartition.values().iterator();
            while (it.hasNext()) {
                hashSet.addAll(it.next().keySet());
            }
        }
        if (exportLog.isDebugEnabled()) {
            exportLog.debug("Current tables: " + hashSet);
        }
        Set<Integer> keySet = map.keySet();
        boolean z2 = false;
        NavigableSet<Table> exportTablesExcludeViewOnly = CatalogUtil.getExportTablesExcludeViewOnly(catalogMap);
        HashSet hashSet2 = new HashSet();
        for (Table table : exportTablesExcludeViewOnly) {
            addDataSources(table, i, map, keySet, exportDataProcessor, catalogContext.m_genId, z);
            hashSet2.add(table.getTypeName());
            z2 = true;
        }
        updateStreamStatus(hashSet2);
        Iterator<String> it2 = hashSet2.iterator();
        while (it2.hasNext()) {
            hashSet.remove(it2.next());
        }
        if (!hashSet.isEmpty()) {
            removeDataSources(hashSet);
        }
        createAckMailboxesIfNeeded(z2 ? keySet : new HashSet<>());
    }

    private void updateStreamStatus(Set<String> set) {
        synchronized (this.m_dataSourcesByPartition) {
            for (Map<String, ExportDataSource> map : this.m_dataSourcesByPartition.values()) {
                for (String str : map.keySet()) {
                    ExportDataSource exportDataSource = map.get(str);
                    if (!set.contains(str)) {
                        exportDataSource.setStatus(ExportDataSource.StreamStatus.DROPPED);
                    } else if (exportDataSource.getStatus() == ExportDataSource.StreamStatus.DROPPED) {
                        exportDataSource.setStatus(ExportDataSource.StreamStatus.ACTIVE);
                    }
                }
            }
        }
    }

    private void createAckMailboxesIfNeeded(Set<Integer> set) {
        this.m_mailboxesZKPath = "/db/export_generations/mailboxes";
        if (this.m_mbox == null) {
            this.m_mbox = new LocalMailbox(this.m_messenger) { // from class: org.voltdb.export.ExportGeneration.1
                @Override // org.voltdb.messaging.LocalMailbox, org.voltcore.messaging.Mailbox
                public void deliver(VoltMessage voltMessage) {
                    if (!(voltMessage instanceof BinaryPayloadMessage)) {
                        ExportGeneration.exportLog.error("Received unexpected message " + voltMessage + " in export subsystem");
                        return;
                    }
                    ByteBuffer wrap = ByteBuffer.wrap(((BinaryPayloadMessage) voltMessage).m_payload);
                    byte b = wrap.get();
                    int i = wrap.getInt();
                    Map map = (Map) ExportGeneration.this.m_dataSourcesByPartition.get(Integer.valueOf(i));
                    byte[] bArr = new byte[wrap.getInt()];
                    wrap.get(bArr);
                    String str = new String(bArr, Constants.UTF8ENCODING);
                    if (map == null) {
                        if (ExportGeneration.this.m_removingPartitions.contains(Integer.valueOf(i))) {
                            return;
                        }
                        ExportGeneration.exportLogLimited.log("Received an export message " + ((int) b) + " for partition " + i + " which does not exist on this node: this should be a transient condition.", EstTime.currentTimeMillis());
                        return;
                    }
                    ExportDataSource exportDataSource = (ExportDataSource) map.get(str);
                    if (exportDataSource == null) {
                        ExportGeneration.exportLogLimited.log("Received export message " + ((int) b) + " for partition " + i + " source " + str + " which does not exist on this node: this should be a transient condition. Sources = " + map, EstTime.currentTimeMillis());
                        return;
                    }
                    if (b != 1) {
                        ExportGeneration.exportLog.error("Received unsupported message type " + voltMessage + " in export subsystem");
                        return;
                    }
                    long j = wrap.getLong();
                    long j2 = wrap.getLong();
                    try {
                        if (j2 < exportDataSource.getGenerationIdCreated()) {
                            if (ExportGeneration.exportLog.isDebugEnabled()) {
                                ExportGeneration.exportLog.debug("Ignored stale RELEASE_BUFFER message for " + exportDataSource.toString() + " , sequence number: " + j + ", generationIdCreated: " + j2 + " from " + CoreUtils.hsIdToString(voltMessage.m_sourceHSId) + " to " + CoreUtils.hsIdToString(ExportGeneration.this.m_mbox.getHSId()));
                            }
                        } else {
                            if (ExportGeneration.exportLog.isDebugEnabled()) {
                                ExportGeneration.exportLog.debug("Received RELEASE_BUFFER message for " + exportDataSource.toString() + " , sequence number: " + j + ", generationIdCreated: " + j2 + " from " + CoreUtils.hsIdToString(voltMessage.m_sourceHSId) + " to " + CoreUtils.hsIdToString(ExportGeneration.this.m_mbox.getHSId()));
                            }
                            exportDataSource.remoteAck(j);
                        }
                    } catch (RejectedExecutionException e) {
                    }
                }
            };
            this.m_messenger.createMailbox(null, this.m_mbox);
        }
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            updateAckMailboxes(it.next().intValue(), null);
        }
        updateReplicaList(set);
    }

    public void updateAckMailboxes(int i, Set<Long> set) {
        ImmutableList<Long> immutableList = this.m_replicasHSIds.get(Integer.valueOf(i));
        synchronized (this.m_dataSourcesByPartition) {
            Map<String, ExportDataSource> map = this.m_dataSourcesByPartition.get(Integer.valueOf(i));
            if (map == null) {
                return;
            }
            Iterator<ExportDataSource> it = map.values().iterator();
            while (it.hasNext()) {
                it.next().updateAckMailboxes(Pair.of(this.m_mbox, immutableList));
            }
        }
    }

    private void removeMailbox(final int i) {
        String str = this.m_mailboxesZKPath + Tokens.T_DIVIDE + i;
        this.m_removingPartitions.add(Integer.valueOf(i));
        this.m_messenger.getZK().delete(str + Tokens.T_DIVIDE + this.m_mbox.getHSId(), -1, new ZKUtil.VoidCallback() { // from class: org.voltdb.export.ExportGeneration.2
            @Override // org.voltcore.zk.ZKUtil.VoidCallback, org.apache.zookeeper_voltpatches.AsyncCallback.VoidCallback
            public void processResult(int i2, String str2, Object obj) {
                super.processResult(i2, str2, obj);
                ExportGeneration.this.m_removingPartitions.remove(Integer.valueOf(i));
            }
        }, null);
        this.m_replicasHSIds.remove(Integer.valueOf(i));
    }

    private void updateReplicaList(final Set<Integer> set) {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            String str = this.m_mailboxesZKPath + Tokens.T_DIVIDE + it.next();
            ZKUtil.asyncMkdirs(this.m_messenger.getZK(), str);
            this.m_messenger.getZK().create(str + Tokens.T_DIVIDE + this.m_mbox.getHSId(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new ZKUtil.StringCallback(), null);
        }
        try {
            m_childUpdatingThread.submit(new Runnable() { // from class: org.voltdb.export.ExportGeneration.3
                @Override // java.lang.Runnable
                public void run() {
                    ArrayList<Pair> arrayList = new ArrayList();
                    for (Integer num : set) {
                        ZKUtil.ChildrenCallback childrenCallback = new ZKUtil.ChildrenCallback();
                        ExportGeneration.this.m_messenger.getZK().getChildren(ExportGeneration.this.m_mailboxesZKPath + Tokens.T_DIVIDE + num, ExportGeneration.this.constructMailboxChildWatcher(), childrenCallback, (Object) null);
                        arrayList.add(Pair.of(num, childrenCallback));
                    }
                    for (Pair pair : arrayList) {
                        Integer num2 = (Integer) pair.getFirst();
                        try {
                            List<String> list = ((ZKUtil.ChildrenCallback) pair.getSecond()).get();
                            ImmutableList.Builder builder = ImmutableList.builder();
                            for (String str2 : list) {
                                if (!str2.equals(Long.toString(ExportGeneration.this.m_mbox.getHSId()))) {
                                    builder.add((ImmutableList.Builder) Long.valueOf(str2));
                                }
                            }
                            ExportGeneration.this.m_replicasHSIds.put(num2, builder.build());
                            ExportGeneration.this.updateAckMailboxes(num2.intValue(), null);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        } catch (KeeperException e2) {
                            throw new RuntimeException(e2);
                        }
                    }
                }
            }).get();
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Watcher constructMailboxChildWatcher() {
        if (this.m_shutdown) {
            return null;
        }
        return new Watcher() { // from class: org.voltdb.export.ExportGeneration.4
            @Override // org.apache.zookeeper_voltpatches.Watcher
            public void process(final WatchedEvent watchedEvent) {
                ExportGeneration.m_childUpdatingThread.submit(new Runnable() { // from class: org.voltdb.export.ExportGeneration.4.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            ExportGeneration.this.handleChildUpdate(watchedEvent);
                        } catch (Throwable th) {
                            VoltDB.crashLocalVoltDB("Error in export ack handling", true, th);
                        }
                    }
                });
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleChildUpdate(WatchedEvent watchedEvent) {
        if (this.m_shutdown) {
            return;
        }
        this.m_messenger.getZK().getChildren(watchedEvent.getPath(), constructMailboxChildWatcher(), constructChildRetrievalCallback(), (Object) null);
    }

    private AsyncCallback.ChildrenCallback constructChildRetrievalCallback() {
        if (this.m_shutdown) {
            return null;
        }
        return new AsyncCallback.ChildrenCallback() { // from class: org.voltdb.export.ExportGeneration.5
            @Override // org.apache.zookeeper_voltpatches.AsyncCallback.ChildrenCallback
            public void processResult(final int i, final String str, Object obj, final List<String> list) {
                ExportGeneration.m_childUpdatingThread.submit(new Runnable() { // from class: org.voltdb.export.ExportGeneration.5.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            if (ExportGeneration.this.m_shutdown) {
                                return;
                            }
                            KeeperException.Code code = KeeperException.Code.get(i);
                            if (code == KeeperException.Code.NONODE) {
                                if (ExportGeneration.exportLog.isDebugEnabled()) {
                                    ExportGeneration.exportLog.debug("Path not found generation drain most likely finished on other node: " + str);
                                }
                            } else if (code != KeeperException.Code.OK) {
                                throw KeeperException.create(code);
                            }
                            String[] split = str.split(Tokens.T_DIVIDE);
                            int intValue = Integer.valueOf(split[split.length - 1]).intValue();
                            ImmutableList immutableList = (ImmutableList) ExportGeneration.this.m_replicasHSIds.get(Integer.valueOf(intValue));
                            if (immutableList == null) {
                                return;
                            }
                            if (ExportGeneration.exportLog.isDebugEnabled()) {
                                ExportGeneration.exportLog.debug("Process children change: " + str);
                            }
                            ImmutableList.Builder builder = ImmutableList.builder();
                            for (String str2 : list) {
                                if (!str2.equals(Long.toString(ExportGeneration.this.m_mbox.getHSId()))) {
                                    builder.add((ImmutableList.Builder) Long.valueOf(str2));
                                }
                            }
                            ImmutableList build = builder.build();
                            Sets.SetView difference = Sets.difference(new HashSet(build), new HashSet(immutableList));
                            if (ExportGeneration.exportLog.isDebugEnabled()) {
                                ExportGeneration.exportLog.debug("Current export generation added mailbox: " + CoreUtils.hsIdCollectionToString(difference) + ", removed mailbox: " + CoreUtils.hsIdCollectionToString(Sets.difference(new HashSet((Collection) ExportGeneration.this.m_replicasHSIds.get(Integer.valueOf(intValue))), new HashSet(build))));
                            }
                            ExportGeneration.this.m_replicasHSIds.put(Integer.valueOf(intValue), build);
                            ExportGeneration.this.updateAckMailboxes(intValue, difference);
                        } catch (Throwable th) {
                            VoltDB.crashLocalVoltDB("Error in export ack handling", true, th);
                        }
                    }
                });
            }
        };
    }

    @Override // org.voltdb.export.Generation
    public List<ExportStatsBase.ExportStatsRow> getStats(boolean z) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        synchronized (this.m_dataSourcesByPartition) {
            hashMap.putAll(this.m_dataSourcesByPartition);
        }
        Iterator it = hashMap.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Map) it.next()).values().iterator();
            while (it2.hasNext()) {
                ListenableFuture<ExportStatsBase.ExportStatsRow> immutableStatsRow = ((ExportDataSource) it2.next()).getImmutableStatsRow(z);
                if (immutableStatsRow != null) {
                    arrayList.add(immutableStatsRow);
                }
            }
        }
        try {
            if (!arrayList.isEmpty()) {
                return (List) Futures.allAsList(arrayList).get();
            }
        } catch (Exception e) {
            exportLog.error("Unexpected exception syncing export data during snapshot save.", e);
        }
        return new ArrayList();
    }

    private void addDataSource(File file, Map<Integer, Integer> map, List<Integer> list, ExportDataProcessor exportDataProcessor, long j) throws IOException {
        ExportDataSource exportDataSource = new ExportDataSource(this, file, map, exportDataProcessor, j);
        exportDataSource.setCoordination(this.m_messenger.getZK(), Integer.valueOf(this.m_messenger.getHostId()));
        list.add(Integer.valueOf(exportDataSource.getPartitionId()));
        if (CatalogUtil.isPersistentMigrate(exportDataSource.getTableName())) {
            exportDataSource.setupMigrateRowsDeleter(CatalogUtil.getIsreplicated(exportDataSource.getTableName()) ? 16383 : exportDataSource.getPartitionId());
        }
        if (exportLog.isDebugEnabled()) {
            exportLog.debug("Creating " + exportDataSource.toString() + " for " + file + " bytes " + exportDataSource.sizeInBytes());
        }
        synchronized (this.m_dataSourcesByPartition) {
            Map<String, ExportDataSource> map2 = this.m_dataSourcesByPartition.get(Integer.valueOf(exportDataSource.getPartitionId()));
            if (map2 == null) {
                map2 = new HashMap();
                this.m_dataSourcesByPartition.put(Integer.valueOf(exportDataSource.getPartitionId()), map2);
            } else if (map2.get(exportDataSource.getTableName()) != null) {
                exportLog.warn("On Disk generation with same table, partition already exists using known data source.");
                return;
            }
            map2.put(exportDataSource.getTableName(), exportDataSource);
        }
    }

    private void addDataSources(Table table, int i, Map<Integer, Integer> map, Set<Integer> set, ExportDataProcessor exportDataProcessor, long j, boolean z) {
        for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            int intValue2 = entry.getValue().intValue();
            synchronized (this.m_dataSourcesByPartition) {
                try {
                    Map<String, ExportDataSource> map2 = this.m_dataSourcesByPartition.get(Integer.valueOf(intValue));
                    if (map2 == null) {
                        map2 = new HashMap();
                        this.m_dataSourcesByPartition.put(Integer.valueOf(intValue), map2);
                    }
                    String typeName = table.getTypeName();
                    if (map2.containsKey(typeName)) {
                        ExportDataSource exportDataSource = map2.get(typeName);
                        ExportClientBase exportClient = exportDataProcessor.getExportClient(typeName);
                        if (exportClient != null) {
                            exportDataSource.setClient(exportClient);
                            exportDataSource.setRunEveryWhere(exportClient.isRunEverywhere());
                        } else {
                            exportDataSource.setClient(null);
                            exportDataSource.setRunEveryWhere(false);
                        }
                        exportDataSource.markInCatalog(set.contains(Integer.valueOf(intValue)));
                        if (z) {
                            exportDataSource.updateCatalog(table, j);
                        }
                    } else {
                        ExportDataSource exportDataSource2 = new ExportDataSource(this, exportDataProcessor, typeName, intValue, intValue2, j, table.getColumns(), table.getPartitioncolumn(), this.m_directory.getPath());
                        exportDataSource2.setCoordination(this.m_messenger.getZK(), Integer.valueOf(this.m_messenger.getHostId()));
                        if (TableType.isPersistentMigrate(table.getTabletype())) {
                            exportDataSource2.setupMigrateRowsDeleter(table.getIsreplicated() ? 16383 : exportDataSource2.getPartitionId());
                        }
                        if (exportLog.isDebugEnabled()) {
                            exportLog.debug("Creating ExportDataSource for table in catalog " + typeName + " partition " + intValue + " site " + intValue2);
                        }
                        map2.put(typeName, exportDataSource2);
                        if (z) {
                            exportDataSource2.updateCatalog(table, j);
                        }
                    }
                } catch (IOException e) {
                    VoltDB.crashLocalVoltDB("Error creating datasources for table " + table.getTypeName() + " host id " + i, true, e);
                }
            }
        }
    }

    private void removeDataSources(Set<String> set) {
        LinkedList<ExportDataSource> linkedList = new LinkedList();
        synchronized (this.m_dataSourcesByPartition) {
            for (Map<String, ExportDataSource> map : this.m_dataSourcesByPartition.values()) {
                for (String str : set) {
                    ExportDataSource exportDataSource = map.get(str);
                    if (exportDataSource != null) {
                        linkedList.add(exportDataSource);
                        map.remove(str);
                    }
                }
            }
        }
        for (ExportDataSource exportDataSource2 : linkedList) {
            exportLog.info("Finished processing " + exportDataSource2);
            VoltDB.getExportManager().onClosingSource(exportDataSource2.getTableName(), exportDataSource2.getPartitionId());
            exportDataSource2.closeAndDelete();
        }
    }

    @Override // org.voltdb.export.Generation
    public void onSourceDrained(int i, String str) {
        synchronized (this.m_dataSourcesByPartition) {
            Map<String, ExportDataSource> map = this.m_dataSourcesByPartition.get(Integer.valueOf(i));
            if (map == null) {
                if (!this.m_removingPartitions.contains(Integer.valueOf(i))) {
                    exportLog.error("Could not find export data sources for partition " + i + ". The export cleanup stream is being discarded.");
                }
                return;
            }
            ExportDataSource exportDataSource = map.get(str);
            if (exportDataSource == null) {
                exportLog.warn("Could not find export data source for signature " + i + " name " + str + ". The export cleanup stream is being discarded.");
                return;
            }
            exportLog.info("Drained source for " + str + ", partition " + i);
            map.remove(str);
            if (map.isEmpty()) {
                this.m_dataSourcesByPartition.remove(Integer.valueOf(i));
                removeMailbox(i);
            }
            VoltDB.getExportManager().onClosingSource(str, i);
            exportDataSource.closeAndDelete();
        }
    }

    @Override // org.voltdb.export.Generation
    public void pushExportBuffer(int i, String str, long j, long j2, int i2, long j3, DBBPool.BBContainer bBContainer) {
        Map<String, ExportDataSource> map = this.m_dataSourcesByPartition.get(Integer.valueOf(i));
        if (map == null) {
            if (!((RealVoltDB) VoltDB.instance()).isPartitionDecommissioned(i)) {
                exportLog.error("PUSH Could not find export data sources for partition " + i + ". The export data is being discarded.");
            }
            if (bBContainer != null) {
                bBContainer.discard();
                return;
            }
            return;
        }
        ExportDataSource exportDataSource = map.get(str);
        if (exportDataSource != null) {
            exportDataSource.pushExportBuffer(j, j2, i2, j3, bBContainer);
            return;
        }
        exportLogLimitedPush.log("PUSH on unknown export data source for partition " + i + " Table " + str + ". The export data (seq: " + j + ", count: " + i2 + ") is being discarded.", EstTime.currentTimeMillis());
        if (bBContainer != null) {
            bBContainer.discard();
        }
    }

    private void cleanup() {
        this.m_shutdown = true;
        if (this.m_mbox == null || this.m_messenger == null) {
            return;
        }
        synchronized (this.m_dataSourcesByPartition) {
            Iterator<Integer> it = this.m_dataSourcesByPartition.keySet().iterator();
            while (it.hasNext()) {
                try {
                    this.m_messenger.getZK().delete((this.m_mailboxesZKPath + Tokens.T_DIVIDE + it.next()) + Tokens.T_DIVIDE + this.m_mbox.getHSId(), 0);
                } catch (InterruptedException e) {
                } catch (KeeperException e2) {
                }
            }
        }
        this.m_messenger.removeMailbox(this.m_mbox);
    }

    @Override // org.voltdb.export.Generation
    public void updateInitialExportStateToSeqNo(int i, String str, ExportDataSource.StreamStartAction streamStartAction, Map<Integer, SnapshotCompletionMonitor.ExportSnapshotTuple> map) {
        ExportDataSource exportDataSource;
        SnapshotCompletionMonitor.ExportSnapshotTuple exportSnapshotTuple;
        ArrayList arrayList = new ArrayList();
        Map<String, ExportDataSource> map2 = this.m_dataSourcesByPartition.get(Integer.valueOf(i));
        if (map2 != null && (exportDataSource = map2.get(str)) != null && (exportSnapshotTuple = map.get(Integer.valueOf(i))) != null) {
            arrayList.add(exportDataSource.truncateExportToSeqNo(streamStartAction, exportSnapshotTuple.getSequenceNumber(), exportSnapshotTuple.getGenerationId()));
        }
        try {
            if (!arrayList.isEmpty()) {
                Futures.allAsList(arrayList).get();
            }
        } catch (Exception e) {
            VoltDB.crashLocalVoltDB("Unexpected exception truncating export data during snapshot restore. You can back up export overflow data and start the DB without it to get past this error", true, e);
        }
    }

    @Override // org.voltdb.export.Generation
    public void updateDanglingExportStates(ExportDataSource.StreamStartAction streamStartAction, Map<String, Map<Integer, SnapshotCompletionMonitor.ExportSnapshotTuple>> map) {
        ArrayList arrayList = new ArrayList();
        synchronized (this.m_dataSourcesByPartition) {
            Iterator<Map<String, ExportDataSource>> it = this.m_dataSourcesByPartition.values().iterator();
            while (it.hasNext()) {
                for (ExportDataSource exportDataSource : it.next().values()) {
                    if (!exportDataSource.inCatalog()) {
                        Map<Integer, SnapshotCompletionMonitor.ExportSnapshotTuple> map2 = map.get(exportDataSource.getTableName());
                        if (map2 == null) {
                            exportLog.warn("Could not find export sequence number for table " + exportDataSource.getTableName() + ". This warning is safe to ignore if you are loading a pre 1.3 snapshot which would not contain these sequence numbers (added in 1.3). If this is a post 1.3 snapshot then the restore has failed and export sequence  are reset to 0");
                        } else {
                            SnapshotCompletionMonitor.ExportSnapshotTuple exportSnapshotTuple = map2.get(Integer.valueOf(exportDataSource.getPartitionId()));
                            if (exportSnapshotTuple != null) {
                                if (exportLog.isDebugEnabled()) {
                                    exportLog.debug("Updating dangling export " + exportDataSource);
                                }
                                arrayList.add(exportDataSource.truncateExportToSeqNo(streamStartAction, exportSnapshotTuple.getSequenceNumber(), exportSnapshotTuple.getGenerationId()));
                            } else {
                                exportLog.warn("Could not find an export sequence number for table " + exportDataSource.getTableName() + " partition " + exportDataSource.getPartitionId() + ". This warning is safe to ignore if you are loading a pre 1.3 snapshot  which would not contain these sequence numbers (added in 1.3). If this is a post 1.3 snapshot then the restore has failed and export sequence  are reset to 0");
                            }
                        }
                    }
                }
            }
        }
        try {
            if (!arrayList.isEmpty()) {
                Futures.allAsList(arrayList).get();
            }
        } catch (Exception e) {
            VoltDB.crashLocalVoltDB("Unexpected exception truncating export data during snapshot restore. You can back up export overflow data and start the DB without it to get past this error", true, e);
        }
    }

    @Override // org.voltdb.export.Generation
    public void sync() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.m_dataSourcesByPartition) {
            Iterator<Map<String, ExportDataSource>> it = this.m_dataSourcesByPartition.values().iterator();
            while (it.hasNext()) {
                Iterator<ExportDataSource> it2 = it.next().values().iterator();
                while (it2.hasNext()) {
                    ListenableFuture<?> sync = it2.next().sync();
                    if (sync != null) {
                        arrayList.add(sync);
                    }
                }
            }
        }
        try {
            if (!arrayList.isEmpty()) {
                Futures.allAsList(arrayList).get();
            }
        } catch (Exception e) {
            exportLog.error("Unexpected exception syncing export data during snapshot save.", e);
        }
    }

    @Override // org.voltdb.export.Generation
    public void shutdown() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.m_dataSourcesByPartition) {
            Iterator<Map<String, ExportDataSource>> it = this.m_dataSourcesByPartition.values().iterator();
            while (it.hasNext()) {
                Iterator<ExportDataSource> it2 = it.next().values().iterator();
                while (it2.hasNext()) {
                    arrayList.add(it2.next().shutdown());
                }
            }
        }
        try {
            if (!arrayList.isEmpty()) {
                Futures.allAsList(arrayList).get();
            }
        } catch (Exception e) {
            exportLog.error("Unexpected exception shutting down export data.", e);
        }
        this.m_shutdown = true;
        cleanup();
    }

    public void onProcessorShutdown() {
        synchronized (this.m_dataSourcesByPartition) {
            Iterator<Map<String, ExportDataSource>> it = this.m_dataSourcesByPartition.values().iterator();
            while (it.hasNext()) {
                Iterator<ExportDataSource> it2 = it.next().values().iterator();
                while (it2.hasNext()) {
                    it2.next().onProcessorShutdown();
                }
            }
        }
    }

    @Override // org.voltdb.export.Generation
    public void becomeLeader(int i) {
        synchronized (this.m_dataSourcesByPartition) {
            Map<String, ExportDataSource> map = this.m_dataSourcesByPartition.get(Integer.valueOf(i));
            if (map == null) {
                return;
            }
            Iterator<ExportDataSource> it = map.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().becomeLeader();
                } catch (Exception e) {
                    exportLog.error("Unable to start exporting", e);
                }
            }
        }
    }

    @Override // org.voltdb.export.Generation
    public Map<Integer, Map<String, ExportDataSource>> getDataSourceByPartition() {
        return this.m_dataSourcesByPartition;
    }

    public void processStreamControl(String str, List<String> list, StreamControlOperation streamControlOperation, VoltTable voltTable) {
        exportLog.info("Export " + streamControlOperation + " source:" + str + " targets:" + list);
        TreeSet treeSet = new TreeSet(String.CASE_INSENSITIVE_ORDER);
        treeSet.addAll(list);
        synchronized (this.m_dataSourcesByPartition) {
            for (Map.Entry<Integer, Map<String, ExportDataSource>> entry : this.m_dataSourcesByPartition.entrySet()) {
                Integer key = entry.getKey();
                for (ExportDataSource exportDataSource : entry.getValue().values()) {
                    if (exportDataSource.getTableName().equalsIgnoreCase(str) && treeSet.contains(exportDataSource.getTarget())) {
                        if (exportDataSource.processStreamControl(streamControlOperation)) {
                            voltTable.addRow(exportDataSource.getTableName(), exportDataSource.getTarget(), key, "SUCCESS", "");
                        }
                    }
                }
            }
        }
    }

    public void closeDataSources(List<Integer> list) {
        synchronized (this.m_dataSourcesByPartition) {
            for (Map.Entry<Integer, Map<String, ExportDataSource>> entry : this.m_dataSourcesByPartition.entrySet()) {
                if (list.contains(entry.getKey())) {
                    Iterator<ExportDataSource> it = entry.getValue().values().iterator();
                    while (it.hasNext()) {
                        it.next().closeAndDelete();
                    }
                }
            }
            this.m_dataSourcesByPartition.keySet().removeAll(list);
            list.stream().forEach(num -> {
                removeMailbox(num.intValue());
            });
            if (exportLog.isDebugEnabled()) {
                exportLog.info("Remaining datasources:" + this.m_dataSourcesByPartition);
            }
        }
    }

    @Override // org.voltdb.export.Generation
    public void updateGenerationId(long j) {
        synchronized (this.m_dataSourcesByPartition) {
            Iterator<Map<String, ExportDataSource>> it = this.m_dataSourcesByPartition.values().iterator();
            while (it.hasNext()) {
                Iterator<ExportDataSource> it2 = it.next().values().iterator();
                while (it2.hasNext()) {
                    it2.next().updateGenerationId(j);
                }
            }
        }
    }

    private static String getStreamNameFromNonce(String str) {
        return str.substring(0, str.lastIndexOf(95));
    }

    private static String getNonceFromAdFile(File file) {
        return file.getName().substring(0, file.getName().lastIndexOf(46));
    }

    public String toString() {
        return "Export Generation";
    }

    @Override // org.voltdb.export.Generation
    public int getCatalogVersion() {
        return this.m_catalogVersion;
    }
}
