package org.apache.flink.cdc.runtime.operators.schema.regular;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.transform.PreTransformChangeInfo;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.class */
public class SchemaCoordinator extends SchemaRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaCoordinator.class);
    private final ExecutorService schemaChangeThreadPool;
    private transient RequestStatus schemaChangeStatus;
    private transient ConcurrentHashMap<Integer, Set<Integer>> flushedSinkWriters;
    private transient CompletableFuture<CoordinationResponse> pendingResponseFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator$RequestStatus.class */
    public enum RequestStatus {
        IDLE,
        APPLYING
    }

    public SchemaCoordinator(String str, OperatorCoordinator.Context context, ExecutorService executorService, MetadataApplier metadataApplier, List<RouteRule> list, SchemaChangeBehavior schemaChangeBehavior, Duration duration) {
        super(context, str, executorService, metadataApplier, list, schemaChangeBehavior, duration);
        this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    public void start() throws Exception {
        super.start();
        this.flushedSinkWriters = new ConcurrentHashMap<>();
        this.schemaChangeStatus = RequestStatus.IDLE;
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    public void close() throws Exception {
        super.close();
        if (this.schemaChangeThreadPool == null || this.schemaChangeThreadPool.isShutdown()) {
            return;
        }
        this.schemaChangeThreadPool.shutdownNow();
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void snapshot(CompletableFuture<byte[]> completableFuture) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    dataOutputStream.writeInt(SchemaManager.SERIALIZER.getVersion());
                    byte[] serialize = SchemaManager.SERIALIZER.serialize(this.schemaManager);
                    dataOutputStream.writeInt(serialize.length);
                    dataOutputStream.write(serialize);
                    dataOutputStream.writeInt(0);
                    completableFuture.complete(byteArrayOutputStream.toByteArray());
                    if (dataOutputStream != null) {
                        if (0 != 0) {
                            try {
                                dataOutputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataOutputStream.close();
                        }
                    }
                    if (byteArrayOutputStream != null) {
                        if (0 == 0) {
                            byteArrayOutputStream.close();
                            return;
                        }
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (dataOutputStream != null) {
                    if (th2 != null) {
                        try {
                            dataOutputStream.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        dataOutputStream.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
            throw th8;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r9v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r9v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 10, insn: 0x00de: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r10 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:43:0x00de */
    /* JADX WARN: Not initialized variable reg: 9, insn: 0x00d9: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:41:0x00d9 */
    /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r9v1, types: [java.io.DataInputStream] */
    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void restore(byte[] bArr) throws Exception {
        ?? r9;
        ?? r10;
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        Throwable th = null;
        try {
            try {
                DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
                Throwable th2 = null;
                int readInt = dataInputStream.readInt();
                switch (readInt) {
                    case 0:
                        byte[] bArr2 = new byte[dataInputStream.readInt()];
                        dataInputStream.readFully(bArr2);
                        this.schemaManager = SchemaManager.SERIALIZER.m6deserialize(readInt, bArr2);
                        break;
                    case PreTransformChangeInfo.Serializer.VERSION_BEFORE_STATE_COMPATIBILITY /* 1 */:
                    case 2:
                        byte[] bArr3 = new byte[dataInputStream.readInt()];
                        dataInputStream.readFully(bArr3);
                        this.schemaManager = SchemaManager.SERIALIZER.m6deserialize(readInt, bArr3);
                        consumeUnusedSchemaDerivationBytes(dataInputStream);
                        break;
                    default:
                        throw new IOException("Unrecognized serialization version " + readInt);
                }
                if (dataInputStream != null) {
                    if (0 != 0) {
                        try {
                            dataInputStream.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        dataInputStream.close();
                    }
                }
                if (byteArrayInputStream != null) {
                    if (0 == 0) {
                        byteArrayInputStream.close();
                        return;
                    }
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (r9 != 0) {
                    if (r10 != 0) {
                        try {
                            r9.close();
                        } catch (Throwable th6) {
                            r10.addSuppressed(th6);
                        }
                    } else {
                        r9.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (byteArrayInputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    byteArrayInputStream.close();
                }
            }
            throw th7;
        }
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void handleCustomCoordinationRequest(CoordinationRequest coordinationRequest, CompletableFuture<CoordinationResponse> completableFuture) {
        if (!(coordinationRequest instanceof SchemaChangeRequest)) {
            throw new UnsupportedOperationException("Unknown coordination request type: " + coordinationRequest);
        }
        handleSchemaChangeRequest((SchemaChangeRequest) coordinationRequest, completableFuture);
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void handleFlushSuccessEvent(FlushSuccessEvent flushSuccessEvent) {
        int sinkSubTaskId = flushSuccessEvent.getSinkSubTaskId();
        int sourceSubTaskId = flushSuccessEvent.getSourceSubTaskId();
        LOG.info("Sink subtask {} succeed flushing from source subTask {}.", Integer.valueOf(sinkSubTaskId), Integer.valueOf(sourceSubTaskId));
        if (!this.flushedSinkWriters.containsKey(Integer.valueOf(sourceSubTaskId))) {
            this.flushedSinkWriters.put(Integer.valueOf(sourceSubTaskId), ConcurrentHashMap.newKeySet());
        }
        this.flushedSinkWriters.get(Integer.valueOf(sourceSubTaskId)).add(Integer.valueOf(sinkSubTaskId));
        LOG.info("Currently flushed sink writers for source task {} are: {}", Integer.valueOf(sourceSubTaskId), this.flushedSinkWriters.get(Integer.valueOf(sourceSubTaskId)));
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void handleUnrecoverableError(String str, Throwable th) {
        super.handleUnrecoverableError(str, th);
        if (this.pendingResponseFuture != null) {
            this.pendingResponseFuture.completeExceptionally(th);
        }
    }

    public void handleSchemaChangeRequest(SchemaChangeRequest schemaChangeRequest, CompletableFuture<CoordinationResponse> completableFuture) {
        int subTaskId = schemaChangeRequest.getSubTaskId();
        if (this.schemaChangeStatus != RequestStatus.IDLE) {
            completableFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.busy()));
            return;
        }
        if (this.activeSinkWriters.size() < this.currentParallelism) {
            LOG.info("Not all active sink writers have been registered. Current {}, expected {}.", Integer.valueOf(this.activeSinkWriters.size()), Integer.valueOf(this.currentParallelism));
            completableFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.waitingForFlush()));
        } else {
            if (!this.activeSinkWriters.equals(this.flushedSinkWriters.get(Integer.valueOf(subTaskId)))) {
                LOG.info("Not all active sink writers have completed flush. Flushed writers: {}, expected: {}.", this.flushedSinkWriters.get(Integer.valueOf(subTaskId)), this.activeSinkWriters);
                completableFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.waitingForFlush()));
                return;
            }
            LOG.info("All sink writers have flushed for subTaskId {}. Switching to APPLYING state and starting schema evolution...", Integer.valueOf(subTaskId));
            this.flushedSinkWriters.remove(Integer.valueOf(subTaskId));
            this.schemaChangeStatus = RequestStatus.APPLYING;
            this.pendingResponseFuture = completableFuture;
            startSchemaChangesEvolve(schemaChangeRequest, completableFuture);
        }
    }

    private void startSchemaChangesEvolve(SchemaChangeRequest schemaChangeRequest, CompletableFuture<CoordinationResponse> completableFuture) {
        SchemaChangeEvent schemaChangeEvent = schemaChangeRequest.getSchemaChangeEvent();
        Schema orElse = this.schemaManager.getLatestOriginalSchema(schemaChangeEvent.tableId()).orElse(null);
        ArrayList arrayList = new ArrayList();
        if (SchemaUtils.isSchemaChangeEventRedundant(orElse, schemaChangeEvent)) {
            LOG.info("Schema change event {} is redundant for current schema {}, just skip it.", schemaChangeEvent, orElse);
        } else {
            this.schemaManager.applyOriginalSchemaChange(schemaChangeEvent);
            arrayList.addAll(deduceEvolvedSchemaChanges(schemaChangeEvent));
        }
        LOG.info("All sink subtask have flushed for table {}. Start to apply schema change request: \n\t{}\nthat extracts to:\n\t{}", new Object[]{schemaChangeRequest.getTableId().toString(), schemaChangeRequest, arrayList.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining("\n\t"))});
        this.schemaChangeThreadPool.submit(() -> {
            try {
                applySchemaChange(schemaChangeEvent, arrayList);
            } catch (Throwable th) {
                failJob("Schema change applying task", new FlinkRuntimeException("Failed to apply schema change event.", th));
                throw th;
            }
        });
    }

    private List<SchemaChangeEvent> deduceEvolvedSchemaChanges(SchemaChangeEvent schemaChangeEvent) {
        LOG.info("Step 1 - Start deducing evolved schema change for {}", schemaChangeEvent);
        TableId tableId = schemaChangeEvent.tableId();
        ArrayList arrayList = new ArrayList();
        Set<TableId> allOriginalTables = this.schemaManager.getAllOriginalTables();
        Set<TableId> affectedEvolvedTables = SchemaDerivator.getAffectedEvolvedTables(this.router, Collections.singleton(tableId));
        LOG.info("Step 2 - Affected downstream tables are: {}", affectedEvolvedTables);
        for (TableId tableId2 : affectedEvolvedTables) {
            Schema orElse = this.schemaManager.getLatestEvolvedSchema(tableId2).orElse(null);
            LOG.info("Step 3.1 - For to-be-evolved table {} with schema {}...", tableId2, orElse);
            Set<TableId> reverseLookupDependingUpstreamTables = SchemaDerivator.reverseLookupDependingUpstreamTables(this.router, tableId2, allOriginalTables);
            Preconditions.checkArgument(!reverseLookupDependingUpstreamTables.isEmpty(), "An affected sink table's upstream dependency cannot be empty.", new Object[0]);
            LOG.info("Step 3.2 - upstream dependency tables are: {}", reverseLookupDependingUpstreamTables);
            ArrayList arrayList2 = new ArrayList();
            if (reverseLookupDependingUpstreamTables.size() == 1) {
                SchemaChangeEvent copy = schemaChangeEvent.copy(tableId2);
                arrayList2.add(copy);
                LOG.info("Step 3.3 - It's an one-by-one routing and could be forwarded as {}.", copy);
            } else {
                Set<Schema> reverseLookupDependingUpstreamSchemas = SchemaDerivator.reverseLookupDependingUpstreamSchemas(this.router, tableId2, this.schemaManager);
                LOG.info("Step 3.3 - Upstream dependency schemas are: {}.", reverseLookupDependingUpstreamSchemas);
                Schema schema = orElse;
                Iterator<Schema> it = reverseLookupDependingUpstreamSchemas.iterator();
                while (it.hasNext()) {
                    schema = SchemaMergingUtils.getLeastCommonSchema(schema, it.next());
                }
                LOG.info("Step 3.4 - Deduced widest schema is: {}.", schema);
                List schemaDifference = SchemaMergingUtils.getSchemaDifference(tableId2, orElse, schema);
                LOG.info("Step 3.5 - It's an many-to-one routing and causes schema changes: {}.", schemaDifference);
                arrayList2.addAll(schemaDifference);
            }
            List<SchemaChangeEvent> normalizeSchemaChangeEvents = SchemaDerivator.normalizeSchemaChangeEvents(orElse, arrayList2, this.behavior, this.metadataApplier);
            LOG.info("Step 4 - After being normalized with {} behavior, final schema change events are: {}", this.behavior, normalizeSchemaChangeEvents);
            arrayList.addAll(normalizeSchemaChangeEvents);
        }
        return arrayList;
    }

    private void applySchemaChange(SchemaChangeEvent schemaChangeEvent, List<SchemaChangeEvent> list) {
        if (SchemaChangeBehavior.EXCEPTION.equals(this.behavior) && list.stream().anyMatch(schemaChangeEvent2 -> {
            return !(schemaChangeEvent2 instanceof CreateTableEvent);
        })) {
            throw new SchemaEvolveException(list.stream().filter(schemaChangeEvent3 -> {
                return !(schemaChangeEvent3 instanceof CreateTableEvent);
            }).findAny().get(), "Unexpected schema change events occurred in EXCEPTION mode. Job will fail now.");
        }
        ArrayList arrayList = new ArrayList();
        for (SchemaChangeEvent schemaChangeEvent4 : list) {
            if (applyAndUpdateEvolvedSchemaChange(schemaChangeEvent4)) {
                arrayList.add(schemaChangeEvent4);
            }
        }
        HashMap hashMap = new HashMap();
        for (TableId tableId : this.router.route(schemaChangeEvent.tableId())) {
            hashMap.put(tableId, this.schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
        }
        this.pendingResponseFuture.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.success(arrayList, hashMap)));
        this.pendingResponseFuture = null;
        Preconditions.checkState(this.schemaChangeStatus == RequestStatus.APPLYING, "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " + this.schemaChangeStatus);
        this.schemaChangeStatus = RequestStatus.IDLE;
        LOG.info("SchemaChangeStatus switched from APPLYING to IDLE.");
    }

    private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
        try {
            this.metadataApplier.applySchemaChange(schemaChangeEvent);
            this.schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
            LOG.info("Successfully applied schema change event {} to external system.", schemaChangeEvent);
            return true;
        } catch (Throwable th) {
            if (!shouldIgnoreException(th)) {
                throw th;
            }
            LOG.warn("Failed to apply schema change {}, but keeps running in tolerant mode. Caused by: {}", schemaChangeEvent, th);
            return false;
        }
    }

    private boolean shouldIgnoreException(Throwable th) {
        return (th instanceof UnsupportedSchemaChangeEventException) && SchemaChangeBehavior.TRY_EVOLVE.equals(this.behavior);
    }

    private void consumeUnusedSchemaDerivationBytes(DataInputStream dataInputStream) throws IOException {
        TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
        int readInt = dataInputStream.readInt();
        HashMap hashMap = new HashMap(readInt);
        for (int i = 0; i < readInt; i++) {
            TableId m75deserialize = tableIdSerializer.m75deserialize((DataInputView) new DataInputViewStreamWrapper(dataInputStream));
            int readInt2 = dataInputStream.readInt();
            HashSet hashSet = new HashSet(readInt2);
            for (int i2 = 0; i2 < readInt2; i2++) {
                hashSet.add(tableIdSerializer.m75deserialize((DataInputView) new DataInputViewStreamWrapper(dataInputStream)));
            }
            hashMap.put(m75deserialize, hashSet);
        }
    }
}
