package org.apache.flink.cdc.runtime.operators.schema.common;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetEvolvedSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetEvolvedSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.common.event.SinkWriterRegisterEvent;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.class */
public abstract class SchemaRegistry implements OperatorCoordinator, CoordinationRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistry.class);
    protected final OperatorCoordinator.Context context;
    protected final String operatorName;
    protected final ExecutorService coordinatorExecutor;
    protected final MetadataApplier metadataApplier;
    protected final Duration rpcTimeout;
    protected final List<RouteRule> routingRules;
    protected final SchemaChangeBehavior behavior;
    protected transient int currentParallelism;
    protected transient Set<Integer> activeSinkWriters;
    protected transient Map<Integer, Throwable> failedReasons;
    protected transient SchemaManager schemaManager;
    protected transient TableIdRouter router;

    /* JADX INFO: Access modifiers changed from: protected */
    public SchemaRegistry(OperatorCoordinator.Context context, String str, ExecutorService executorService, MetadataApplier metadataApplier, List<RouteRule> list, SchemaChangeBehavior schemaChangeBehavior, Duration duration) {
        this.context = context;
        this.operatorName = str;
        this.coordinatorExecutor = executorService;
        this.metadataApplier = metadataApplier;
        this.routingRules = list;
        this.rpcTimeout = duration;
        this.behavior = schemaChangeBehavior;
    }

    public void start() throws Exception {
        LOG.info("Starting SchemaRegistry - {}.", this.operatorName);
        this.currentParallelism = this.context.currentParallelism();
        this.activeSinkWriters = ConcurrentHashMap.newKeySet();
        this.failedReasons = new ConcurrentHashMap();
        this.schemaManager = new SchemaManager();
        this.router = new TableIdRouter(this.routingRules);
    }

    public void close() throws Exception {
        LOG.info("Closing SchemaRegistry - {}.", this.operatorName);
        this.coordinatorExecutor.shutdown();
        try {
            this.metadataApplier.close();
        } catch (Exception e) {
            LOG.error("Failed to close metadata applier.", e);
            throw new IOException("Failed to close metadata applier.", e);
        }
    }

    protected abstract void snapshot(CompletableFuture<byte[]> completableFuture) throws Exception;

    protected abstract void restore(byte[] bArr) throws Exception;

    protected void handleSinkWriterRegisterEvent(SinkWriterRegisterEvent sinkWriterRegisterEvent) throws Exception {
        LOG.info("Sink subtask {} already registered.", Integer.valueOf(sinkWriterRegisterEvent.getSubtask()));
        this.activeSinkWriters.add(Integer.valueOf(sinkWriterRegisterEvent.getSubtask()));
    }

    protected abstract void handleFlushSuccessEvent(FlushSuccessEvent flushSuccessEvent) throws Exception;

    protected void handleGetEvolvedSchemaRequest(GetEvolvedSchemaRequest getEvolvedSchemaRequest, CompletableFuture<CoordinationResponse> completableFuture) throws Exception {
        LOG.info("Handling evolved schema request: {}", getEvolvedSchemaRequest);
        int schemaVersion = getEvolvedSchemaRequest.getSchemaVersion();
        TableId tableId = getEvolvedSchemaRequest.getTableId();
        if (schemaVersion == -1) {
            completableFuture.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(this.schemaManager.getLatestEvolvedSchema(tableId).orElse(null))));
            return;
        }
        try {
            completableFuture.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(this.schemaManager.getEvolvedSchema(tableId, schemaVersion))));
        } catch (IllegalArgumentException e) {
            LOG.warn("Some client is requesting an non-existed evolved schema for table {} with version {}", tableId, Integer.valueOf(schemaVersion));
            completableFuture.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(null)));
        }
    }

    protected void handleGetOriginalSchemaRequest(GetOriginalSchemaRequest getOriginalSchemaRequest, CompletableFuture<CoordinationResponse> completableFuture) throws Exception {
        LOG.info("Handling original schema request: {}", getOriginalSchemaRequest);
        int schemaVersion = getOriginalSchemaRequest.getSchemaVersion();
        TableId tableId = getOriginalSchemaRequest.getTableId();
        if (schemaVersion == -1) {
            completableFuture.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(this.schemaManager.getLatestOriginalSchema(tableId).orElse(null))));
            return;
        }
        try {
            completableFuture.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(this.schemaManager.getOriginalSchema(tableId, schemaVersion))));
        } catch (IllegalArgumentException e) {
            LOG.warn("Some client is requesting an non-existed original schema for table {} with version {}", tableId, Integer.valueOf(schemaVersion));
            completableFuture.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(null)));
        }
    }

    protected abstract void handleCustomCoordinationRequest(CoordinationRequest coordinationRequest, CompletableFuture<CoordinationResponse> completableFuture) throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleUnrecoverableError(String str, Throwable th) {
        LOG.error("Uncaught exception in the Schema Registry ({}) event loop for {}.", new Object[]{this.operatorName, str, th});
        LOG.error("\tCurrent schema manager state: {}", this.schemaManager);
    }

    public final CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest coordinationRequest) {
        CompletableFuture<CoordinationResponse> completableFuture = new CompletableFuture<>();
        runInEventLoop(() -> {
            if (coordinationRequest instanceof GetEvolvedSchemaRequest) {
                handleGetEvolvedSchemaRequest((GetEvolvedSchemaRequest) coordinationRequest, completableFuture);
            } else if (coordinationRequest instanceof GetOriginalSchemaRequest) {
                handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) coordinationRequest, completableFuture);
            } else {
                handleCustomCoordinationRequest(coordinationRequest, completableFuture);
            }
        }, "Handling request - %s", coordinationRequest);
        return completableFuture;
    }

    public final void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) {
        runInEventLoop(() -> {
            if (operatorEvent instanceof FlushSuccessEvent) {
                handleFlushSuccessEvent((FlushSuccessEvent) operatorEvent);
            } else {
                if (!(operatorEvent instanceof SinkWriterRegisterEvent)) {
                    throw new FlinkRuntimeException("Unrecognized Operator Event: " + operatorEvent);
                }
                handleSinkWriterRegisterEvent((SinkWriterRegisterEvent) operatorEvent);
            }
        }, "Handling event - %s (from subTask %d)", operatorEvent, Integer.valueOf(i));
    }

    public final void subtaskReset(int i, long j) {
        LOG.error("Subtask {} reset at checkpoint {}.", new Object[]{Integer.valueOf(i), Long.valueOf(j), this.failedReasons.get(Integer.valueOf(i))});
    }

    public final void executionAttemptFailed(int i, int i2, @Nullable Throwable th) {
        if (th != null) {
            this.failedReasons.put(Integer.valueOf(i), th);
        }
    }

    public final void executionAttemptReady(int i, int i2, OperatorCoordinator.SubtaskGateway subtaskGateway) {
    }

    public final void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
        LOG.info("Going to start checkpoint No.{}", Long.valueOf(j));
        runInEventLoop(() -> {
            snapshot(completableFuture);
        }, "Taking checkpoint - %d", Long.valueOf(j));
    }

    public final void notifyCheckpointComplete(long j) {
        LOG.info("Successfully completed checkpoint No.{}", Long.valueOf(j));
    }

    public final void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
        LOG.info("Going to restore from checkpoint No.{}", Long.valueOf(j));
        if (bArr == null) {
            return;
        }
        restore(bArr);
    }

    protected void runInEventLoop(ThrowingRunnable<Throwable> throwingRunnable, String str, Object... objArr) {
        this.coordinatorExecutor.execute(() -> {
            try {
                throwingRunnable.run();
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                handleUnrecoverableError(String.format(str, objArr), th);
                this.context.failJob(th);
            }
        });
    }

    protected void loopUntil(BooleanSupplier booleanSupplier, Runnable runnable, Duration duration, Duration duration2) throws TimeoutException {
        loopWhen(() -> {
            return !booleanSupplier.getAsBoolean();
        }, runnable, duration, duration2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void loopWhen(BooleanSupplier booleanSupplier, Runnable runnable, Duration duration, Duration duration2) throws TimeoutException {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        long millis = duration2.toMillis();
        while (booleanSupplier.getAsBoolean()) {
            runnable.run();
            if (System.currentTimeMillis() > currentTimeMillis) {
                throw new TimeoutException("Loop checking time limit has exceeded.");
            }
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends Throwable> void failJob(String str, T t) {
        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
        LOG.error("An exception was triggered from {}. Job will fail now.", str, t);
        handleUnrecoverableError(str, t);
        this.context.failJob(t);
    }

    @VisibleForTesting
    public void emplaceOriginalSchema(TableId tableId, Schema schema) {
        this.schemaManager.registerNewOriginalSchema(tableId, schema);
    }

    @VisibleForTesting
    public void emplaceEvolvedSchema(TableId tableId, Schema schema) {
        this.schemaManager.registerNewEvolvedSchema(tableId, schema);
    }
}
