package org.apache.flink.cdc.connectors.mysql.source.assigners;

import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.class */
public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlHybridSplitAssigner.class);
    private static final String BINLOG_SPLIT_ID = "binlog-split";
    private final int splitMetaGroupSize;
    private final MySqlSourceConfig sourceConfig;
    private boolean isBinlogSplitAssigned;
    private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;

    public MySqlHybridSplitAssigner(MySqlSourceConfig mySqlSourceConfig, int i, List<TableId> list, boolean z, SplitEnumeratorContext<MySqlSplit> splitEnumeratorContext) {
        this(mySqlSourceConfig, new MySqlSnapshotSplitAssigner(mySqlSourceConfig, i, list, z, splitEnumeratorContext), false, mySqlSourceConfig.getSplitMetaGroupSize());
    }

    public MySqlHybridSplitAssigner(MySqlSourceConfig mySqlSourceConfig, int i, HybridPendingSplitsState hybridPendingSplitsState, SplitEnumeratorContext<MySqlSplit> splitEnumeratorContext) {
        this(mySqlSourceConfig, new MySqlSnapshotSplitAssigner(mySqlSourceConfig, i, hybridPendingSplitsState.getSnapshotPendingSplits(), splitEnumeratorContext), hybridPendingSplitsState.isBinlogSplitAssigned(), mySqlSourceConfig.getSplitMetaGroupSize());
    }

    private MySqlHybridSplitAssigner(MySqlSourceConfig mySqlSourceConfig, MySqlSnapshotSplitAssigner mySqlSnapshotSplitAssigner, boolean z, int i) {
        this.sourceConfig = mySqlSourceConfig;
        this.snapshotSplitAssigner = mySqlSnapshotSplitAssigner;
        this.isBinlogSplitAssigned = z;
        this.splitMetaGroupSize = i;
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void open() {
        this.snapshotSplitAssigner.open();
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public Optional<MySqlSplit> getNext() {
        if (AssignerStatus.isNewlyAddedAssigningSnapshotFinished(getAssignerStatus())) {
            return Optional.empty();
        }
        if (!this.snapshotSplitAssigner.noMoreSplits()) {
            return this.snapshotSplitAssigner.getNext();
        }
        if (this.isBinlogSplitAssigned) {
            return Optional.empty();
        }
        if (AssignerStatus.isInitialAssigningFinished(this.snapshotSplitAssigner.getAssignerStatus())) {
            this.isBinlogSplitAssigned = true;
            return Optional.of(createBinlogSplit());
        }
        if (!AssignerStatus.isNewlyAddedAssigningFinished(this.snapshotSplitAssigner.getAssignerStatus())) {
            return Optional.empty();
        }
        this.isBinlogSplitAssigned = true;
        return Optional.empty();
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public boolean waitingForFinishedSplits() {
        return this.snapshotSplitAssigner.waitingForFinishedSplits();
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        return this.snapshotSplitAssigner.getFinishedSplitInfos();
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void onFinishedSplits(Map<String, BinlogOffset> map) {
        this.snapshotSplitAssigner.onFinishedSplits(map);
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void addSplits(Collection<MySqlSplit> collection) {
        ArrayList arrayList = new ArrayList();
        for (MySqlSplit mySqlSplit : collection) {
            if (mySqlSplit.isSnapshotSplit()) {
                arrayList.add(mySqlSplit);
            } else {
                this.isBinlogSplitAssigned = false;
            }
        }
        this.snapshotSplitAssigner.addSplits(arrayList);
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public PendingSplitsState snapshotState(long j) {
        return new HybridPendingSplitsState(this.snapshotSplitAssigner.snapshotState(j), this.isBinlogSplitAssigned);
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void notifyCheckpointComplete(long j) {
        this.snapshotSplitAssigner.notifyCheckpointComplete(j);
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public AssignerStatus getAssignerStatus() {
        return this.snapshotSplitAssigner.getAssignerStatus();
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public boolean noMoreSplits() {
        return this.snapshotSplitAssigner.noMoreSplits() && this.isBinlogSplitAssigned;
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void startAssignNewlyAddedTables() {
        this.snapshotSplitAssigner.startAssignNewlyAddedTables();
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void onBinlogSplitUpdated() {
        this.snapshotSplitAssigner.onBinlogSplitUpdated();
    }

    @Override // org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.snapshotSplitAssigner.close();
    }

    private MySqlBinlogSplit createBinlogSplit() {
        List<MySqlSchemalessSnapshotSplit> list = (List) this.snapshotSplitAssigner.getAssignedSplits().values().stream().sorted(Comparator.comparing((v0) -> {
            return v0.splitId();
        })).collect(Collectors.toList());
        Map<String, BinlogOffset> splitFinishedOffsets = this.snapshotSplitAssigner.getSplitFinishedOffsets();
        ArrayList arrayList = new ArrayList();
        BinlogOffset binlogOffset = null;
        BinlogOffset binlogOffset2 = null;
        for (MySqlSchemalessSnapshotSplit mySqlSchemalessSnapshotSplit : list) {
            BinlogOffset binlogOffset3 = splitFinishedOffsets.get(mySqlSchemalessSnapshotSplit.splitId());
            if (binlogOffset == null || binlogOffset3.isBefore(binlogOffset)) {
                binlogOffset = binlogOffset3;
            }
            if (binlogOffset2 == null || binlogOffset3.isAfter(binlogOffset2)) {
                binlogOffset2 = binlogOffset3;
            }
            arrayList.add(new FinishedSnapshotSplitInfo(mySqlSchemalessSnapshotSplit.getTableId(), mySqlSchemalessSnapshotSplit.splitId(), mySqlSchemalessSnapshotSplit.getSplitStart(), mySqlSchemalessSnapshotSplit.getSplitEnd(), binlogOffset3));
        }
        BinlogOffset ofNonStopping = BinlogOffset.ofNonStopping();
        if (this.sourceConfig.getStartupOptions().isSnapshotOnly()) {
            ofNonStopping = binlogOffset2;
        }
        return new MySqlBinlogSplit("binlog-split", binlogOffset == null ? BinlogOffset.ofEarliest() : binlogOffset, ofNonStopping, arrayList.size() > this.splitMetaGroupSize ? new ArrayList() : arrayList, new HashMap(), arrayList.size());
    }
}
