package org.apache.flink.cdc.runtime.partitioning;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.class */
public class DistributedPrePartitionOperator extends AbstractStreamOperator<PartitioningEvent> implements OneInputStreamOperator<Event, PartitioningEvent>, Serializable {
    private static final long serialVersionUID = 1;
    private final int downstreamParallelism;
    private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;
    private transient Map<TableId, Schema> schemaMap;
    private transient Map<TableId, HashFunction<DataChangeEvent>> hashFunctionMap;
    private transient int subTaskId;

    public DistributedPrePartitionOperator(int i, HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.downstreamParallelism = i;
        this.hashFunctionProvider = hashFunctionProvider;
    }

    public void open() throws Exception {
        super.open();
        this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
        this.schemaMap = new HashMap();
        this.hashFunctionMap = new HashMap();
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        SchemaChangeEvent schemaChangeEvent = (Event) streamRecord.getValue();
        if (!(schemaChangeEvent instanceof SchemaChangeEvent)) {
            if (!(schemaChangeEvent instanceof DataChangeEvent)) {
                throw new IllegalStateException(this.subTaskId + "> PrePartition operator received an unexpected event: " + schemaChangeEvent);
            }
            partitionBy((DataChangeEvent) schemaChangeEvent);
        } else {
            SchemaChangeEvent schemaChangeEvent2 = schemaChangeEvent;
            TableId tableId = schemaChangeEvent2.tableId();
            this.schemaMap.compute(tableId, (tableId2, schema) -> {
                return SchemaUtils.applySchemaChangeEvent(schema, schemaChangeEvent2);
            });
            this.hashFunctionMap.put(tableId, recreateHashFunction(tableId));
            broadcastEvent(schemaChangeEvent);
        }
    }

    private void partitionBy(DataChangeEvent dataChangeEvent) {
        this.output.collect(new StreamRecord(PartitioningEvent.ofDistributed(dataChangeEvent, this.subTaskId, this.hashFunctionMap.get(dataChangeEvent.tableId()).hashcode(dataChangeEvent) % this.downstreamParallelism)));
    }

    private void broadcastEvent(Event event) {
        for (int i = 0; i < this.downstreamParallelism; i++) {
            this.output.collect(new StreamRecord(PartitioningEvent.ofDistributed(EventSerializer.INSTANCE.copy(event), this.subTaskId, i)));
        }
    }

    private HashFunction<DataChangeEvent> recreateHashFunction(TableId tableId) {
        return this.hashFunctionProvider.getHashFunction(tableId, this.schemaMap.get(tableId));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
    }
}
