/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.jobhub.sdk.client.kafka;

import com.ovopark.jobhub.sdk.client.JobLog;
import com.ovopark.jobhub.sdk.client.JobService;
import com.ovopark.jobhub.sdk.client.kafka.TaskListenerRunnerViaKafkaProvider;
import com.ovopark.jobhub.sdk.model.TaskGetResponse;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Model;
import com.ovopark.kernel.shared.OnlyPrivate;
import com.ovopark.kernel.shared.ServiceProvider;
import com.ovopark.kernel.shared.Util;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.TopicPartitionOffset;

public abstract class HeavyPerTopicPartitionTaskListener
implements JobService.TaskListener {
    private static final Logger log = LoggerFactory.getLogger(HeavyPerTopicPartitionTaskListener.class);
    @Autowired
    protected JobService jobService;

    public void on(JobService.TaskContext taskContext) {
        TaskGetResponse.Task task = taskContext.task();
        log.info("process task: " + JSONAccessor.impl().format((Object)task));
        String taskId = task.getId();
        String jsonStr = task.getJsonStr();
        if (Util.isEmpty((CharSequence)jsonStr)) {
            log.info("args is empty???" + JSONAccessor.impl().format((Object)task));
            return;
        }
        SubKafkaDataListener subKafkaDataListener = this.subKafkaDataListener(taskContext);
        subKafkaDataListener.beforeStartSubKafka(taskContext);
        SubKafkaConsumerConfig subKafkaConsumerConfig = this.subKafkaConsumerConfig(taskContext);
        String topic = subKafkaConsumerConfig.topic();
        int partition = subKafkaConsumerConfig.partition();
        ConsumerFactory<?, ?> subKafkaConsumerFactory = this.subKafkaConsumerFactory();
        if (subKafkaConsumerConfig.checkPartitionRange()) {
            try (Consumer tempConsumer = subKafkaConsumerFactory.createConsumer();){
                List partitions = tempConsumer.partitionsFor(topic);
                if (Util.isEmpty((Collection)partitions)) {
                    log.info("topic does not exists?: " + topic);
                    return;
                }
                if (partition >= 0 && partition >= partitions.size()) {
                    log.info(topic + " , exceed max partitions?: " + partition + " > " + partitions.size());
                    return;
                }
            }
        }
        TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(topic, partition);
        ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset[]{topicPartitionOffset});
        containerProps.setGroupId(taskId + ":" + Util.uniqueFirstPart() + ":" + partition);
        Map<String, Object> kafkaConsumerProperties = subKafkaConsumerConfig.kafkaConsumerProperties();
        kafkaConsumerProperties.forEach(containerProps.getKafkaConsumerProperties()::put);
        containerProps.getKafkaConsumerProperties().put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        SubKafkaContextImpl subKafkaContext = new SubKafkaContextImpl(this, topic, partition, countDownLatch);
        containerProps.setMessageListener((Object)new PerPartitionSeekOffsetMessageListener(taskContext, subKafkaDataListener, subKafkaConsumerConfig, subKafkaConsumerFactory, subKafkaContext));
        KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(subKafkaConsumerFactory, containerProps);
        container.setApplicationEventPublisher(event -> log.info("event: " + event.getClass().getName()));
        container.start();
        Util.logLink((String)("start kafka consumer: " + topic + ":" + partition + ", config: " + JSONAccessor.impl().format((Object)subKafkaConsumerConfig))).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)taskContext.jobLog()).log(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
        try {
            int expiredTimeMs = 60000;
            while (!(countDownLatch.await(30L, TimeUnit.SECONDS) || subKafkaContext.isCancelled() || taskContext.isCancelled() || System.currentTimeMillis() - subKafkaContext.latestTime.get() > (long)expiredTimeMs)) {
            }
            if (taskContext.isCancelled()) {
                Util.logLink((String)"task is cancelled by remote, end").log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)taskContext.jobLog()).log(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
            }
            if (subKafkaContext.isCancelled()) {
                Util.logLink((String)"task is cancelled by self, end").log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)taskContext.jobLog()).log(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
            }
            if (System.currentTimeMillis() - subKafkaContext.latestTime.get() > (long)expiredTimeMs) {
                subKafkaContext.cancelled.set(true);
                Util.logLink((String)("task is expired (" + Util.formatTime((LocalDateTime)LocalDateTime.now(), (String[])new String[0]) + " - " + Util.formatTime((LocalDateTime)Util.dateTime((long)subKafkaContext.latestTime.get()), (String[])new String[0]) + " > " + expiredTimeMs + " ms ), end")).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)taskContext.jobLog()).log(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
            }
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
            throw Util.convert2RuntimeException((Throwable)e);
        }
        finally {
            try {
                container.stop();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
            try {
                subKafkaDataListener.afterEndSubKafka(taskContext);
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
            Util.logLink((String)("close kafka consumer: " + topic + ":" + partition)).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)taskContext.jobLog()).log(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
        }
        boolean running = container.isRunning();
        Util.logLink((String)("container is running?: " + running + ", consume data,  recordCount: " + subKafkaContext.recordCount.get() + ", skipRecordCount: " + subKafkaContext.skipRecordCount.get())).log(arg_0 -> ((Logger)log).info(arg_0)).log(arg_0 -> ((JobLog)taskContext.jobLog()).log(arg_0)).log(arg_0 -> ((JobService.TaskContext)taskContext).appendLog(arg_0));
    }

    protected abstract SubKafkaConsumerConfig subKafkaConsumerConfig(JobService.TaskContext var1);

    protected abstract ConsumerFactory<?, ?> subKafkaConsumerFactory();

    protected abstract SubKafkaDataListener subKafkaDataListener(JobService.TaskContext var1);

    public static interface SubKafkaDataListener {
        public void beforeStartSubKafka(JobService.TaskContext var1);

        public void onData(JobService.TaskContext var1, ConsumerRecordContext var2);

        public void afterEndSubKafka(JobService.TaskContext var1);
    }

    public static interface SubKafkaConsumerConfig
    extends Model {
        public String topic();

        public int partition();

        public long offset();

        public long startTimeMs();

        public long endTimeMs();

        public Map<String, Object> kafkaConsumerProperties();

        default public boolean checkPartitionRange() {
            return true;
        }
    }

    private class SubKafkaContextImpl
    implements SubKafkaContext {
        private final String topic;
        private final int partition;
        private final AtomicBoolean cancelled;
        private final CountDownLatch countDownLatch;
        private final AtomicLong recordCount;
        private final AtomicLong skipRecordCount;
        private final AtomicLong latestTime;
        private long startOffset = -1L;
        private long endOffset = -1L;

        public SubKafkaContextImpl(HeavyPerTopicPartitionTaskListener heavyPerTopicPartitionTaskListener, String topic, int partition, CountDownLatch countDownLatch) {
            this.topic = topic;
            this.partition = partition;
            this.cancelled = new AtomicBoolean(false);
            this.countDownLatch = countDownLatch;
            this.recordCount = new AtomicLong();
            this.skipRecordCount = new AtomicLong();
            this.latestTime = new AtomicLong(System.currentTimeMillis());
        }

        @Override
        public String topic() {
            return this.topic;
        }

        @Override
        public int partition() {
            return this.partition;
        }

        @Override
        public void cancel() {
            this.cancelled.set(true);
            this.countDownLatch.countDown();
        }

        @Override
        public boolean isCancelled() {
            return this.cancelled.get();
        }

        @Override
        public void incrementRecordCount() {
            this.recordCount.incrementAndGet();
        }

        @Override
        public void incrementSkipRecordCount() {
            this.skipRecordCount.incrementAndGet();
        }

        @Override
        public void latestTime(long l) {
            this.latestTime.set(l);
        }

        @Override
        public long startOffset() {
            return this.startOffset;
        }

        @Override
        public long endOffset() {
            return this.endOffset;
        }
    }

    class PerPartitionSeekOffsetMessageListener
    implements AcknowledgingMessageListener<String, String>,
    ConsumerSeekAware {
        final JobService.TaskContext taskContext;
        final SubKafkaDataListener subKafkaDataListener;
        final SubKafkaConsumerConfig subKafkaConsumerConfig;
        final ConsumerFactory<?, ?> consumerFactory;
        final SubKafkaContextImpl subKafkaContext;

        public PerPartitionSeekOffsetMessageListener(JobService.TaskContext taskContext, SubKafkaDataListener subKafkaDataListener, SubKafkaConsumerConfig subKafkaConsumerConfig, ConsumerFactory<?, ?> consumerFactory, SubKafkaContextImpl subKafkaContext) {
            this.taskContext = taskContext;
            this.subKafkaDataListener = subKafkaDataListener;
            this.subKafkaConsumerConfig = subKafkaConsumerConfig;
            this.consumerFactory = consumerFactory;
            this.subKafkaContext = subKafkaContext;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(final ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
            String topic = record.topic();
            final Thread t = Thread.currentThread();
            TaskGetResponse.Task task = this.taskContext.task();
            String taskKey = "_job4kafka_" + task.getJobId() + "_task_" + task.getId();
            MDC.put((String)"traceId", (String)taskKey);
            MDC.put((String)"requestId", (String)taskKey);
            this.taskContext.captureSupplier((Supplier)new ServiceProvider<Map<String, Object>>(){

                public Map<String, Object> get() {
                    HashMap<String, Object> data = new HashMap<String, Object>();
                    data.put("subIoThread", t.getName() + "@" + t.hashCode());
                    StackTraceElement[] stackTraceElements = t.getStackTrace();
                    ArrayList<String> stackList = new ArrayList<String>(stackTraceElements.length);
                    for (StackTraceElement stackTraceElement : stackTraceElements) {
                        stackList.add(stackTraceElement.toString());
                    }
                    data.put("subIoStack", stackList);
                    return data;
                }
            });
            TaskListenerRunnerViaKafkaProvider.KafkaTaskContext kafkaTaskContext = (TaskListenerRunnerViaKafkaProvider.KafkaTaskContext)this.taskContext;
            SubProgressStat subProgressStat = kafkaTaskContext.getSubProgressStat();
            if (subProgressStat.getStartOffset() == 0L) {
                subProgressStat.setStartOffset(record.offset());
            }
            subProgressStat.setOffset(record.offset());
            try {
                this.subKafkaContext.incrementRecordCount();
                this.subKafkaContext.latestTime(System.currentTimeMillis());
                if (this.subKafkaContext.isCancelled()) {
                    this.subKafkaContext.incrementSkipRecordCount();
                    return;
                }
                if (this.taskContext.isCancelled()) {
                    this.subKafkaContext.cancel();
                    this.subKafkaContext.incrementSkipRecordCount();
                    return;
                }
                long offset = record.offset();
                if (this.subKafkaContext.endOffset() > -1L && this.subKafkaContext.endOffset() < offset) {
                    this.subKafkaContext.cancel();
                    this.subKafkaContext.incrementSkipRecordCount();
                    log.info("exceed max offset: " + offset + " > " + this.subKafkaContext.endOffset());
                    return;
                }
                taskKey = "_job4kafka_" + task.getJobId() + "_task_" + task.getId() + ":" + offset;
                MDC.put((String)"traceId", (String)taskKey);
                MDC.put((String)"requestId", (String)taskKey);
                this.subKafkaDataListener.onData(this.taskContext, new ConsumerRecordContextImpl(this, this.subKafkaContext){

                    @Override
                    public ConsumerRecord<String, String> record() {
                        return record;
                    }
                });
            }
            catch (Exception e) {
                log.error("Error processing record from topic: {}", (Object)topic, (Object)e);
            }
            finally {
                MDC.remove((String)"traceId");
                MDC.remove((String)"requestId");
            }
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            TaskListenerRunnerViaKafkaProvider.KafkaTaskContext kafkaTaskContext = (TaskListenerRunnerViaKafkaProvider.KafkaTaskContext)this.taskContext;
            SubProgressStat subProgressStat = kafkaTaskContext.getSubProgressStat();
            long startTimeMs = this.subKafkaConsumerConfig.startTimeMs();
            long endTimeMs = this.subKafkaConsumerConfig.endTimeMs();
            long defOffset = this.subKafkaConsumerConfig.offset();
            if (defOffset > 0L) {
                for (TopicPartition tp2 : assignments.keySet()) {
                    callback.seek(tp2.topic(), tp2.partition(), defOffset);
                    this.subKafkaContext.startOffset = defOffset;
                    subProgressStat.setStartOffset(defOffset);
                    log.info("seek offset (" + defOffset + "), " + tp2.topic() + ":" + tp2.partition());
                }
                log.info("cannot set end offset, use default time expired???");
                return;
            }
            if (startTimeMs > 0L || endTimeMs > 0L) {
                try (Consumer tempConsumer = this.consumerFactory.createConsumer();){
                    if (startTimeMs > 0L) {
                        Map startOffsets = tempConsumer.offsetsForTimes(assignments.keySet().stream().collect(Collectors.toMap(tp -> tp, tp -> startTimeMs)));
                        for (TopicPartition tp3 : assignments.keySet()) {
                            OffsetAndTimestamp ot = (OffsetAndTimestamp)startOffsets.get(tp3);
                            if (ot != null) {
                                callback.seek(tp3.topic(), tp3.partition(), ot.offset());
                                this.subKafkaContext.startOffset = ot.offset();
                                subProgressStat.setStartOffset(ot.offset());
                                log.info("seek offset (" + ot.offset() + ") of timestamp ( " + startTimeMs + " ), " + tp3.topic() + ":" + tp3.partition());
                                continue;
                            }
                            callback.seekToEnd(tp3.topic(), tp3.partition());
                            log.info("seek offset (end) of timestamp ( " + startTimeMs + " ), " + tp3.topic() + ":" + tp3.partition());
                        }
                    }
                    if (endTimeMs > 0L) {
                        Map endedOffsets = tempConsumer.endOffsets(assignments.keySet());
                        endedOffsets.forEach((tp, v) -> {
                            this.subKafkaContext.endOffset = v;
                            subProgressStat.setEndOffset((long)v);
                            log.info("first , set default latest end offset (" + v + "), " + tp.topic() + ":" + tp.partition());
                        });
                        Map endOffsets = tempConsumer.offsetsForTimes(assignments.keySet().stream().collect(Collectors.toMap(tp -> tp, tp -> endTimeMs)));
                        for (TopicPartition tp4 : assignments.keySet()) {
                            OffsetAndTimestamp ot = (OffsetAndTimestamp)endOffsets.get(tp4);
                            if (ot == null) continue;
                            this.subKafkaContext.endOffset = ot.offset();
                            subProgressStat.setEndOffset(ot.offset());
                            log.info("again ,set real end offset (" + ot.offset() + ") of timestamp ( " + startTimeMs + " ), " + tp4.topic() + ":" + tp4.partition());
                        }
                    } else {
                        log.info("cannot set end offset, use default time expired???");
                    }
                }
            }
        }
    }

    public static interface SubKafkaContext {
        public String topic();

        public int partition();

        public void cancel();

        public boolean isCancelled();

        @OnlyPrivate
        public void incrementRecordCount();

        public void incrementSkipRecordCount();

        public void latestTime(long var1);

        default public long startOffset() {
            return -1L;
        }

        default public long endOffset() {
            return -1L;
        }
    }

    public static class SubProgressStat
    implements Model {
        private long offset;
        private long startOffset;
        private long endOffset;

        public long getOffset() {
            return this.offset;
        }

        public long getStartOffset() {
            return this.startOffset;
        }

        public long getEndOffset() {
            return this.endOffset;
        }

        public void setOffset(long offset) {
            this.offset = offset;
        }

        public void setStartOffset(long startOffset) {
            this.startOffset = startOffset;
        }

        public void setEndOffset(long endOffset) {
            this.endOffset = endOffset;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SubProgressStat)) {
                return false;
            }
            SubProgressStat other = (SubProgressStat)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getOffset() != other.getOffset()) {
                return false;
            }
            if (this.getStartOffset() != other.getStartOffset()) {
                return false;
            }
            return this.getEndOffset() == other.getEndOffset();
        }

        protected boolean canEqual(Object other) {
            return other instanceof SubProgressStat;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $offset = this.getOffset();
            result = result * 59 + (int)($offset >>> 32 ^ $offset);
            long $startOffset = this.getStartOffset();
            result = result * 59 + (int)($startOffset >>> 32 ^ $startOffset);
            long $endOffset = this.getEndOffset();
            result = result * 59 + (int)($endOffset >>> 32 ^ $endOffset);
            return result;
        }

        public String toString() {
            return "HeavyPerTopicPartitionTaskListener.SubProgressStat(offset=" + this.getOffset() + ", startOffset=" + this.getStartOffset() + ", endOffset=" + this.getEndOffset() + ")";
        }
    }

    private abstract class ConsumerRecordContextImpl
    implements ConsumerRecordContext {
        final SubKafkaContext subKafkaContext;

        public ConsumerRecordContextImpl(HeavyPerTopicPartitionTaskListener heavyPerTopicPartitionTaskListener, SubKafkaContext subKafkaContext) {
            this.subKafkaContext = subKafkaContext;
        }

        @Override
        public String topic() {
            return this.subKafkaContext.topic();
        }

        @Override
        public int partition() {
            return this.subKafkaContext.partition();
        }

        @Override
        public void cancel() {
            this.subKafkaContext.cancel();
        }

        @Override
        public boolean isCancelled() {
            return this.subKafkaContext.isCancelled();
        }

        @Override
        public void incrementRecordCount() {
            this.subKafkaContext.incrementRecordCount();
        }

        @Override
        public void incrementSkipRecordCount() {
            this.subKafkaContext.incrementSkipRecordCount();
        }

        @Override
        public void latestTime(long latestTime) {
            this.subKafkaContext.latestTime(latestTime);
        }
    }

    public static interface ConsumerRecordContext
    extends SubKafkaContext {
        public ConsumerRecord<String, String> record();
    }
}

