/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.jobhub.sdk.client.kafka;

import com.ovopark.jobhub.sdk.client.JobService;
import com.ovopark.jobhub.sdk.model.TaskGetResponse;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Model;
import com.ovopark.kernel.shared.Util;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.TopicPartitionOffset;

public abstract class HeavyPerTopicPartitionTaskListener
implements JobService.TaskListener {
    private static final Logger log = LoggerFactory.getLogger(HeavyPerTopicPartitionTaskListener.class);
    @Autowired
    protected JobService jobService;

    public void on(final JobService.TaskContext taskContext) {
        final TaskGetResponse.Task task = taskContext.task();
        log.info("process task: " + JSONAccessor.impl().format((Object)task));
        String taskId = task.getId();
        String jsonStr = task.getJsonStr();
        if (Util.isEmpty((CharSequence)jsonStr)) {
            log.info("args is empty???" + JSONAccessor.impl().format((Object)task));
            return;
        }
        final SubKafkaDataListener subKafkaDataListener = this.subKafkaDataListener(taskContext);
        subKafkaDataListener.beforeStartSubKafka(taskContext);
        SubKafkaConsumerConfig subKafkaConsumerConfig = this.subKafkaConsumerConfig(taskContext);
        final String topic = subKafkaConsumerConfig.topic();
        int partition = subKafkaConsumerConfig.partition();
        final long startTimeMs = subKafkaConsumerConfig.startTimeMs();
        ConsumerFactory<String, String> subKafkaConsumerFactory = this.subKafkaConsumerFactory();
        TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(topic, partition);
        ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset[]{topicPartitionOffset});
        containerProps.setGroupId(taskId + ":" + Util.uniqueFirstPart() + ":" + partition);
        Map<String, Object> kafkaConsumerProperties = subKafkaConsumerConfig.kafkaConsumerProperties();
        kafkaConsumerProperties.forEach(containerProps.getKafkaConsumerProperties()::put);
        containerProps.setConsumerRebalanceListener((ConsumerRebalanceListener)new ConsumerAwareRebalanceListener(){

            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                super.onPartitionsAssigned(consumer, partitions);
                if (startTimeMs > 0L) {
                    Map<TopicPartition, Long> timestampsToSearch = partitions.stream().collect(Collectors.toMap(tp -> tp, tp -> startTimeMs));
                    Map offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);
                    for (TopicPartition tp2 : partitions) {
                        OffsetAndTimestamp ot = (OffsetAndTimestamp)offsetsForTimes.get(tp2);
                        if (ot != null) {
                            long offset = ot.offset();
                            consumer.seek(tp2, offset);
                            log.info("seek offset (" + offset + ") of timestamp ( " + startTimeMs + " )");
                            continue;
                        }
                        consumer.seekToEnd(Collections.singletonList(tp2));
                        log.info("seek offset (end) of timestamp ( " + startTimeMs + " )");
                    }
                }
            }
        });
        final AtomicLong count = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean cancelled = new AtomicBoolean(false);
        containerProps.setMessageListener((Object)new AcknowledgingMessageListener<String, String>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onMessage(final ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
                try {
                    count.incrementAndGet();
                    if (cancelled.get()) {
                        countDownLatch.countDown();
                        return;
                    }
                    if (taskContext.isCancelled()) {
                        cancelled.set(true);
                        countDownLatch.countDown();
                        return;
                    }
                    long offset = record.offset();
                    String taskKey = "_job4kafka_" + task.getJobId() + "_task_" + task.getId() + "_" + offset;
                    MDC.put((String)"traceId", (String)taskKey);
                    MDC.put((String)"requestId", (String)taskKey);
                    subKafkaDataListener.onData(taskContext, new ConsumerRecordContext(){

                        @Override
                        public ConsumerRecord<String, String> record() {
                            return record;
                        }

                        @Override
                        public void cancel() {
                            cancelled.set(true);
                            countDownLatch.countDown();
                        }
                    });
                }
                catch (Exception e) {
                    log.error("Error processing record from topic: {}", (Object)topic, (Object)e);
                }
                finally {
                    MDC.remove((String)"traceId");
                    MDC.remove((String)"requestId");
                }
            }
        });
        KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(subKafkaConsumerFactory, containerProps);
        container.setApplicationEventPublisher(event -> {});
        container.start();
        log.info("start kafka consumer: " + topic + ":" + partition);
        try {
            while (!countDownLatch.await(30L, TimeUnit.SECONDS)) {
                if (!taskContext.isCancelled()) continue;
                log.info("task is cancelled, end");
                break;
            }
        }
        catch (InterruptedException e) {
            log.error(e.getMessage(), (Throwable)e);
            throw Util.convert2RuntimeException((Throwable)e);
        }
        finally {
            container.stop();
        }
        boolean running = container.isRunning();
        Util.logLink((String)("container is running?: " + running + ", consume data count: " + count.get())).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
        subKafkaDataListener.afterEndSubKafka(taskContext);
    }

    protected abstract SubKafkaConsumerConfig subKafkaConsumerConfig(JobService.TaskContext var1);

    protected abstract ConsumerFactory<String, String> subKafkaConsumerFactory();

    protected abstract SubKafkaDataListener subKafkaDataListener(JobService.TaskContext var1);

    public static interface SubKafkaDataListener {
        public void beforeStartSubKafka(JobService.TaskContext var1);

        public void onData(JobService.TaskContext var1, ConsumerRecordContext var2);

        public void afterEndSubKafka(JobService.TaskContext var1);
    }

    public static interface SubKafkaConsumerConfig
    extends Model {
        public String topic();

        public int partition();

        public long offset();

        public long startTimeMs();

        public long endTimeMs();

        public Map<String, Object> kafkaConsumerProperties();
    }

    public static interface ConsumerRecordContext {
        public ConsumerRecord<String, String> record();

        public void cancel();
    }
}

