package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.class */
public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFetcher.class);
    private final KafkaDeserializationSchema<T> deserializer;
    private final KafkaFetcher<T>.KafkaCollector kafkaCollector;
    final Handover handover;
    final KafkaConsumerThread consumerThread;
    volatile boolean running;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher$KafkaCollector.class */
    public class KafkaCollector implements Collector<T> {
        private final Queue<T> records;
        private boolean endOfStreamSignalled;

        private KafkaCollector() {
            this.records = new ArrayDeque();
            this.endOfStreamSignalled = false;
        }

        public void collect(T t) {
            if (this.endOfStreamSignalled || KafkaFetcher.this.deserializer.isEndOfStream(t)) {
                this.endOfStreamSignalled = true;
            } else {
                this.records.add(t);
            }
        }

        public Queue<T> getRecords() {
            return this.records;
        }

        public boolean isEndOfStreamSignalled() {
            return this.endOfStreamSignalled;
        }

        public void close() {
        }
    }

    public KafkaFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, String str, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties, long j2, MetricGroup metricGroup, MetricGroup metricGroup2, boolean z) throws Exception {
        super(sourceContext, map, serializedValue, processingTimeService, j, classLoader, metricGroup2, z);
        this.running = true;
        this.deserializer = kafkaDeserializationSchema;
        this.handover = new Handover();
        this.consumerThread = new KafkaConsumerThread(LOG, this.handover, properties, this.unassignedPartitionsQueue, getFetcherName() + " for " + str, j2, z, metricGroup2, metricGroup);
        this.kafkaCollector = new KafkaCollector();
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
    public void runFetchLoop() throws Exception {
        try {
            this.consumerThread.start();
            while (this.running) {
                ConsumerRecords<byte[], byte[]> pollNext = this.handover.pollNext();
                for (KafkaTopicPartitionState<T, TopicPartition> kafkaTopicPartitionState : subscribedPartitionStates()) {
                    partitionConsumerRecordsHandler(pollNext.records(kafkaTopicPartitionState.getKafkaPartitionHandle()), kafkaTopicPartitionState);
                }
            }
            try {
                this.consumerThread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } finally {
            this.consumerThread.shutdown();
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
    public void cancel() {
        this.running = false;
        this.handover.close();
        this.consumerThread.shutdown();
    }

    protected String getFetcherName() {
        return "Kafka Fetcher";
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> list, KafkaTopicPartitionState<T, TopicPartition> kafkaTopicPartitionState) throws Exception {
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            this.deserializer.deserialize(consumerRecord, this.kafkaCollector);
            emitRecordsWithTimestamps(this.kafkaCollector.getRecords(), kafkaTopicPartitionState, consumerRecord.offset(), consumerRecord.timestamp());
            if (this.kafkaCollector.isEndOfStreamSignalled()) {
                this.running = false;
                return;
            }
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
    public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition) {
        return new TopicPartition(kafkaTopicPartition.getTopic(), kafkaTopicPartition.getPartition());
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
    protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> map, @Nonnull KafkaCommitCallback kafkaCommitCallback) throws Exception {
        List<KafkaTopicPartitionState<T, TopicPartition>> subscribedPartitionStates = subscribedPartitionStates();
        HashMap hashMap = new HashMap(subscribedPartitionStates.size());
        for (KafkaTopicPartitionState<T, TopicPartition> kafkaTopicPartitionState : subscribedPartitionStates) {
            Long l = map.get(kafkaTopicPartitionState.getKafkaTopicPartition());
            if (l != null) {
                Preconditions.checkState(l.longValue() >= 0, "Illegal offset value to commit");
                long longValue = l.longValue() + 1;
                hashMap.put(kafkaTopicPartitionState.getKafkaPartitionHandle(), new OffsetAndMetadata(longValue));
                kafkaTopicPartitionState.setCommittedOffset(longValue);
            }
        }
        this.consumerThread.setOffsetsToCommit(hashMap, kafkaCommitCallback);
    }
}
