/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.jobhub.sdk.client.kafka;

import com.ovopark.jobhub.sdk.client.JobService;
import com.ovopark.jobhub.sdk.model.TaskGetResponse;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Model;
import com.ovopark.kernel.shared.Util;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.TopicPartitionOffset;

public abstract class HeavyPerTopicPartitionTaskListener
implements JobService.TaskListener {
    private static final Logger log = LoggerFactory.getLogger(HeavyPerTopicPartitionTaskListener.class);
    @Autowired
    protected JobService jobService;

    public void on(JobService.TaskContext taskContext) {
        TaskGetResponse.Task task = taskContext.task();
        log.info("process task: " + JSONAccessor.impl().format((Object)task));
        String taskId = task.getId();
        String jsonStr = task.getJsonStr();
        if (Util.isEmpty((CharSequence)jsonStr)) {
            log.info("args is empty???" + JSONAccessor.impl().format((Object)task));
            return;
        }
        SubKafkaDataListener subKafkaDataListener = this.subKafkaDataListener(taskContext);
        subKafkaDataListener.beforeStartSubKafka(taskContext);
        SubKafkaConsumerConfig subKafkaConsumerConfig = this.subKafkaConsumerConfig(taskContext);
        String topic = subKafkaConsumerConfig.topic();
        int partition = subKafkaConsumerConfig.partition();
        long startTimeMs = subKafkaConsumerConfig.startTimeMs();
        ConsumerFactory<String, String> subKafkaConsumerFactory = this.subKafkaConsumerFactory();
        TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(topic, partition);
        ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset[]{topicPartitionOffset});
        containerProps.setGroupId(taskId + ":" + Util.uniqueFirstPart() + ":" + partition);
        Map<String, Object> kafkaConsumerProperties = subKafkaConsumerConfig.kafkaConsumerProperties();
        kafkaConsumerProperties.forEach(containerProps.getKafkaConsumerProperties()::put);
        AtomicLong count = new AtomicLong();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicBoolean cancelled = new AtomicBoolean(false);
        AtomicLong latestTime = new AtomicLong(System.currentTimeMillis());
        containerProps.setMessageListener((Object)new SimpleMessageListener(this, count, countDownLatch, cancelled, taskContext, subKafkaDataListener, subKafkaConsumerConfig, latestTime, subKafkaConsumerFactory));
        KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(subKafkaConsumerFactory, containerProps);
        container.setApplicationEventPublisher(event -> {});
        container.start();
        Util.logLink((String)("start kafka consumer: " + topic + ":" + partition)).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
        try {
            int expiredTimeMs = 60000;
            while (!(countDownLatch.await(30L, TimeUnit.SECONDS) || cancelled.get() || taskContext.isCancelled() || System.currentTimeMillis() - latestTime.get() > (long)expiredTimeMs)) {
            }
            if (taskContext.isCancelled()) {
                Util.logLink((String)"task is cancelled by remote, end").log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
            }
            if (cancelled.get()) {
                Util.logLink((String)"task is cancelled by self, end").log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
            }
            if (System.currentTimeMillis() - latestTime.get() > (long)expiredTimeMs) {
                cancelled.set(true);
                Util.logLink((String)("task is expired (" + Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]) + " - " + Util.formatTime((LocalDateTime)Util.dateTime((long)latestTime.get()), (String[])new String[0]) + " > " + expiredTimeMs + " ms ), end")).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
            }
        }
        catch (InterruptedException e) {
            log.error(e.getMessage(), (Throwable)e);
            throw Util.convert2RuntimeException((Throwable)e);
        }
        finally {
            container.stop();
            Util.logLink((String)("close kafka consumer: " + topic + ":" + partition)).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
        }
        boolean running = container.isRunning();
        Util.logLink((String)("container is running?: " + running + ", consume data count: " + count.get())).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
        subKafkaDataListener.afterEndSubKafka(taskContext);
    }

    protected abstract SubKafkaConsumerConfig subKafkaConsumerConfig(JobService.TaskContext var1);

    protected abstract ConsumerFactory<String, String> subKafkaConsumerFactory();

    protected abstract SubKafkaDataListener subKafkaDataListener(JobService.TaskContext var1);

    public static interface SubKafkaDataListener {
        public void beforeStartSubKafka(JobService.TaskContext var1);

        public void onData(JobService.TaskContext var1, ConsumerRecordContext var2);

        public void afterEndSubKafka(JobService.TaskContext var1);
    }

    public static interface SubKafkaConsumerConfig
    extends Model {
        public String topic();

        public int partition();

        public long offset();

        public long startTimeMs();

        public long endTimeMs();

        public Map<String, Object> kafkaConsumerProperties();
    }

    class SimpleMessageListener
    implements AcknowledgingMessageListener<String, String>,
    ConsumerSeekAware {
        final AtomicLong count;
        final CountDownLatch countDownLatch;
        final AtomicBoolean cancelled;
        final JobService.TaskContext taskContext;
        final SubKafkaDataListener subKafkaDataListener;
        final SubKafkaConsumerConfig subKafkaConsumerConfig;
        final AtomicLong latestTime;
        final ConsumerFactory<String, String> consumerFactory;

        public SimpleMessageListener(HeavyPerTopicPartitionTaskListener this$0, AtomicLong count, CountDownLatch countDownLatch, AtomicBoolean cancelled, JobService.TaskContext taskContext, SubKafkaDataListener subKafkaDataListener, SubKafkaConsumerConfig subKafkaConsumerConfig, AtomicLong latestTime, ConsumerFactory<String, String> consumerFactory) {
            this.count = count;
            this.countDownLatch = countDownLatch;
            this.cancelled = cancelled;
            this.taskContext = taskContext;
            this.subKafkaDataListener = subKafkaDataListener;
            this.subKafkaConsumerConfig = subKafkaConsumerConfig;
            this.latestTime = latestTime;
            this.consumerFactory = consumerFactory;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(final ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
            String topic = record.topic();
            try {
                TaskGetResponse.Task task = this.taskContext.task();
                this.count.incrementAndGet();
                this.latestTime.set(System.currentTimeMillis());
                if (this.cancelled.get()) {
                    this.countDownLatch.countDown();
                    return;
                }
                if (this.taskContext.isCancelled()) {
                    this.cancelled.set(true);
                    this.countDownLatch.countDown();
                    return;
                }
                long offset = record.offset();
                String taskKey = "_job4kafka_" + task.getJobId() + "_task_" + task.getId() + "_" + offset;
                MDC.put((String)"traceId", (String)taskKey);
                MDC.put((String)"requestId", (String)taskKey);
                this.subKafkaDataListener.onData(this.taskContext, new ConsumerRecordContext(){

                    @Override
                    public ConsumerRecord<String, String> record() {
                        return record;
                    }

                    @Override
                    public void cancel() {
                        SimpleMessageListener.this.cancelled.set(true);
                        SimpleMessageListener.this.countDownLatch.countDown();
                    }
                });
            }
            catch (Exception e) {
                log.error("Error processing record from topic: {}", (Object)topic, (Object)e);
            }
            finally {
                MDC.remove((String)"traceId");
                MDC.remove((String)"requestId");
            }
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            long startTimeMs = this.subKafkaConsumerConfig.startTimeMs();
            if (startTimeMs > 0L) {
                String bootstrap = (String)this.consumerFactory.getConfigurationProperties().get("bootstrap.servers");
                Properties tempProps = new Properties();
                tempProps.put("bootstrap.servers", bootstrap);
                tempProps.put("key.deserializer", StringDeserializer.class);
                tempProps.put("value.deserializer", StringDeserializer.class);
                try (KafkaConsumer tempConsumer = new KafkaConsumer(tempProps);){
                    Map offsets = tempConsumer.offsetsForTimes(assignments.keySet().stream().collect(Collectors.toMap(tp -> tp, tp -> startTimeMs)));
                    for (TopicPartition tp2 : assignments.keySet()) {
                        OffsetAndTimestamp ot = (OffsetAndTimestamp)offsets.get(tp2);
                        if (ot != null) {
                            callback.seek(tp2.topic(), tp2.partition(), ot.offset());
                            log.info("seek offset (" + ot.offset() + ") of timestamp ( " + startTimeMs + " ), " + tp2.topic() + ":" + tp2.partition());
                            continue;
                        }
                        callback.seekToEnd(tp2.topic(), tp2.partition());
                        log.info("seek offset (end) of timestamp ( " + startTimeMs + " ), " + tp2.topic() + ":" + tp2.partition());
                    }
                }
                log.info("startTimeMs: " + startTimeMs);
            }
        }
    }

    public static interface ConsumerRecordContext {
        public ConsumerRecord<String, String> record();

        public void cancel();
    }
}

