package org.apache.flink.cdc.runtime.operators.schema.distributed;

import java.io.Serializable;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.class */
public class SchemaOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<PartitioningEvent, Event>, Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private final Duration rpcTimeOut;
    private final String timezone;
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final List<RouteRule> routingRules;
    private transient TaskOperatorEventGateway toCoordinator;
    private transient int subTaskId;
    private volatile transient Table<TableId, Integer, Schema> upstreamSchemaTable;
    private volatile transient Map<TableId, Schema> evolvedSchemaMap;
    private transient TableIdRouter tableIdRouter;
    private transient SchemaDerivator derivator;
    private transient SchemaOperatorMetrics schemaOperatorMetrics;

    public SchemaOperator(List<RouteRule> list, Duration duration, SchemaChangeBehavior schemaChangeBehavior, String str) {
        this.routingRules = list;
        this.rpcTimeOut = duration;
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.timezone = str;
    }

    public void open() throws Exception {
        super.open();
        this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
        this.upstreamSchemaTable = HashBasedTable.create();
        this.evolvedSchemaMap = new HashMap();
        this.tableIdRouter = new TableIdRouter(this.routingRules);
        this.derivator = new SchemaDerivator();
        this.schemaOperatorMetrics = new SchemaOperatorMetrics(getRuntimeContext().getMetricGroup(), this.schemaChangeBehavior);
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Event>> output) {
        super.setup(streamTask, streamConfig, output);
        this.toCoordinator = streamTask.getEnvironment().getOperatorCoordinatorEventGateway();
    }

    public void processElement(StreamRecord<PartitioningEvent> streamRecord) throws Exception {
        PartitioningEvent partitioningEvent = (PartitioningEvent) streamRecord.getValue();
        SchemaChangeEvent payload = partitioningEvent.getPayload();
        int sourcePartition = partitioningEvent.getSourcePartition();
        if (!(payload instanceof SchemaChangeEvent)) {
            if (!(payload instanceof DataChangeEvent)) {
                throw new IllegalStateException(this.subTaskId + "> SchemaOperator received an unexpected event: " + payload);
            }
            DataChangeEvent dataChangeEvent = (DataChangeEvent) payload;
            TableId tableId = dataChangeEvent.tableId();
            Schema schema = (Schema) this.upstreamSchemaTable.get(dataChangeEvent.tableId(), Integer.valueOf(sourcePartition));
            for (TableId tableId2 : this.tableIdRouter.route(tableId)) {
                Schema schema2 = this.evolvedSchemaMap.get(tableId2);
                this.output.collect(new StreamRecord(this.derivator.coerceDataRecord(this.timezone, DataChangeEvent.route(dataChangeEvent, tableId2), schema, schema2).orElseThrow(() -> {
                    return new IllegalStateException(String.format("Unable to coerce data record from %s (schema: %s) to %s (schema: %s)", tableId, schema, tableId2, schema2));
                })));
            }
            return;
        }
        this.schemaOperatorMetrics.increaseSchemaChangeEvents(1L);
        SchemaChangeEvent schemaChangeEvent = payload;
        TableId tableId3 = schemaChangeEvent.tableId();
        this.upstreamSchemaTable.put(tableId3, Integer.valueOf(sourcePartition), SchemaUtils.applySchemaChangeEvent((Schema) this.upstreamSchemaTable.get(tableId3, Integer.valueOf(sourcePartition)), schemaChangeEvent));
        if (!(schemaChangeEvent instanceof CreateTableEvent)) {
            if (this.schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
                LOG.info("{}> Schema change event {} has been ignored.", Integer.valueOf(this.subTaskId), payload);
                this.schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1L);
                return;
            } else if (this.schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) {
                throw new SchemaEvolveException(schemaChangeEvent, "Unexpected schema change events occurred in EXCEPTION mode. Job will fail now.");
            }
        }
        requestSchemaChange(tableId3, new SchemaChangeRequest(sourcePartition, this.subTaskId, schemaChangeEvent));
        this.schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(1L);
    }

    private void requestSchemaChange(TableId tableId, SchemaChangeRequest schemaChangeRequest) {
        LOG.info("{}> Sent FlushEvent to downstream...", Integer.valueOf(this.subTaskId));
        this.output.collect(new StreamRecord(new FlushEvent(this.subTaskId, this.tableIdRouter.route(tableId), schemaChangeRequest.getSchemaChangeEvent().getType())));
        LOG.info("{}> Sending evolve request...", Integer.valueOf(this.subTaskId));
        SchemaChangeResponse schemaChangeResponse = (SchemaChangeResponse) sendRequestToCoordinator(schemaChangeRequest);
        LOG.info("{}> Evolve request response: {}", Integer.valueOf(this.subTaskId), schemaChangeResponse);
        schemaChangeResponse.getSchemaEvolveResult().forEach(schemaChangeEvent -> {
            this.evolvedSchemaMap.compute(schemaChangeEvent.tableId(), (tableId2, schema) -> {
                return SchemaUtils.applySchemaChangeEvent(schema, schemaChangeEvent);
            });
        });
        schemaChangeResponse.getSchemaEvolveResult().forEach(schemaChangeEvent2 -> {
            this.output.collect(new StreamRecord(schemaChangeEvent2));
        });
        LOG.info("{}> Successfully updated evolved schema cache. Current state: {}", Integer.valueOf(this.subTaskId), this.evolvedSchemaMap);
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            return (RESPONSE) CoordinationResponseUtils.unwrap((CoordinationResponse) this.toCoordinator.sendRequestToCoordinator(getOperatorID(), new SerializedValue(request)).get(this.rpcTimeOut.toMillis(), TimeUnit.MILLISECONDS));
        } catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }
}
