/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.jobhub.sdk.client.kafka;

import com.ovopark.jobhub.sdk.client.Client2ControlTransport;
import com.ovopark.jobhub.sdk.client.ClientNodeProvider;
import com.ovopark.jobhub.sdk.client.JobClientActive;
import com.ovopark.jobhub.sdk.client.JobLog;
import com.ovopark.jobhub.sdk.client.JobLogImpl;
import com.ovopark.jobhub.sdk.client.JobService;
import com.ovopark.jobhub.sdk.client.JobServiceImpl;
import com.ovopark.jobhub.sdk.client.kafka.HeavyPerTopicPartitionTaskListener;
import com.ovopark.jobhub.sdk.model.JobStatus;
import com.ovopark.jobhub.sdk.model.TaskGetResponse;
import com.ovopark.jobhub.sdk.model.TaskHeartbeatRequest;
import com.ovopark.jobhub.sdk.model.TaskHeartbeatResponse;
import com.ovopark.jobhub.sdk.model.TaskIfDoneRequest;
import com.ovopark.jobhub.sdk.model.TaskIfDoneResponse;
import com.ovopark.jobhub.sdk.model.TaskLogPutRequest;
import com.ovopark.jobhub.sdk.model.TaskLogPutResponse;
import com.ovopark.jobhub.sdk.model.TaskMetaGetRequest;
import com.ovopark.jobhub.sdk.model.TaskMetaGetResponse;
import com.ovopark.jobhub.sdk.model.TaskUpdateRequest;
import com.ovopark.jobhub.sdk.model.TaskUpdateResponse;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Model;
import com.ovopark.kernel.shared.ServiceProvider;
import com.ovopark.kernel.shared.ShutdownManager;
import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.kv.KVEngine;
import com.ovopark.kernel.shared.vclient.ClientNode;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.stereotype.Component;

@JobClientActive
@Component
public class TaskListenerRunnerViaKafkaProvider
implements JobService.TaskListenerRunnerProvider,
CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(TaskListenerRunnerViaKafkaProvider.class);
    @Qualifier(value="com.ovopark.jobhub.sdk.client.kafka.KafkaConfig.consumerFactory")
    @Autowired
    ConsumerFactory<String, String> consumerFactory;
    private final Map<String, TopicPartitions> containers = new ConcurrentHashMap<String, TopicPartitions>();
    @Autowired
    private JobService jobService;
    @Autowired
    private Client2ControlTransport client2ControlTransport;
    @Autowired
    private ClientNodeProvider clientNodeProvider;
    static final KVEngine.TtlFunc<String> ttlFunc = KVEngine.newTtl((String)"t-p-heartbeat-ttl");
    static final AtomicLong vcc = new AtomicLong();
    final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(Util.newThreadFactory((String)"create-kafka-consumer"));

    public String name() {
        return "kafka";
    }

    public boolean start(final String jobType, final String beanUrl, final String group, final Long minVer, final JobService.TaskListener taskListener, final JobService.TaskListenerRunnerProviderConfig taskListenerRunnerProviderConfig) {
        boolean f = this.start0(jobType, beanUrl, group, minVer, taskListener, taskListenerRunnerProviderConfig);
        if (!f) {
            log.warn("cannot start consumer, schedule a delay task: " + beanUrl + ":" + jobType + ":" + group);
            this.scheduledExecutorService.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        boolean f = TaskListenerRunnerViaKafkaProvider.this.start0(jobType, beanUrl, group, minVer, taskListener, taskListenerRunnerProviderConfig);
                        if (!f) {
                            TaskListenerRunnerViaKafkaProvider.this.scheduledExecutorService.schedule(this, 10L, TimeUnit.SECONDS);
                            log.warn("cannot start consumer, schedule a delay task: " + beanUrl + ":" + jobType + ":" + group);
                        }
                    }
                    catch (Exception e) {
                        log.error(e.getMessage(), (Throwable)e);
                        TaskListenerRunnerViaKafkaProvider.this.scheduledExecutorService.schedule(this, 10L, TimeUnit.SECONDS);
                        log.warn("cannot start consumer, schedule a delay task: " + beanUrl + ":" + jobType + ":" + group);
                    }
                }
            }, 10L, TimeUnit.SECONDS);
        }
        return f;
    }

    private boolean start0(final String jobType, String beanUrl, final String group, Long minVer, final JobService.TaskListener taskListener, final JobService.TaskListenerRunnerProviderConfig taskListenerRunnerProviderConfig) {
        final String containerId = beanUrl + ":" + jobType + ":" + group;
        TopicPartitions topicPartitions = (TopicPartitions)Util.lock((Comparable)((Object)containerId), (Callable)new Callable<TopicPartitions>(){

            @Override
            public TopicPartitions call() throws Exception {
                TopicPartitions tps = TaskListenerRunnerViaKafkaProvider.this.containers.get(containerId);
                if (tps != null) {
                    return tps;
                }
                TaskMetaGetRequest taskMetaGetRequest = new TaskMetaGetRequest();
                taskMetaGetRequest.setType(jobType);
                taskMetaGetRequest.setGroup(group);
                TaskMetaGetResponse taskMetaGetResponse = TaskListenerRunnerViaKafkaProvider.this.jobService.taskMetaGet(taskMetaGetRequest);
                if (taskMetaGetResponse == null || Util.isEmpty((CharSequence)taskMetaGetResponse.getTopic())) {
                    log.info("cannot find topic " + JSONAccessor.impl().format((Object)taskMetaGetResponse) + ", request: " + JSONAccessor.impl().format((Object)taskMetaGetRequest));
                    return null;
                }
                String topic = taskMetaGetResponse.getTopic();
                String consumerGroup = Util.isEmpty((CharSequence)taskListenerRunnerProviderConfig.getConsumerGroup()) ? containerId : taskListenerRunnerProviderConfig.getConsumerGroup();
                ContainerProperties containerProps = null;
                String partitions = taskListenerRunnerProviderConfig.getPartitions();
                if (Util.isNotEmpty((CharSequence)partitions)) {
                    ArrayList<TopicPartitionOffset> topicPartitionOffsetList = new ArrayList<TopicPartitionOffset>();
                    for (String p : partitions.split(",")) {
                        TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(topic, Integer.parseInt(p));
                        topicPartitionOffsetList.add(topicPartitionOffset);
                    }
                    containerProps = new ContainerProperties(topicPartitionOffsetList.toArray(new TopicPartitionOffset[0]));
                } else {
                    containerProps = new ContainerProperties(new String[]{topic});
                }
                containerProps.setGroupId(consumerGroup);
                if (Util.isNotEmpty((CharSequence)taskListenerRunnerProviderConfig.getOffsetReset())) {
                    containerProps.getKafkaConsumerProperties().put("auto.offset.reset", taskListenerRunnerProviderConfig.getOffsetReset());
                }
                boolean autoCommit = taskListenerRunnerProviderConfig.isAutoCommit();
                containerProps.getKafkaConsumerProperties().put("enable.auto.commit", (Object)autoCommit);
                if (!autoCommit) {
                    containerProps.setAckMode(ContainerProperties.AckMode.MANUAL);
                }
                if (taskListenerRunnerProviderConfig.getPollIntervalTimeSec() > 0) {
                    containerProps.getKafkaConsumerProperties().put("max.poll.interval.ms", (Object)(taskListenerRunnerProviderConfig.getPollIntervalTimeSec() * 1000));
                }
                if (taskListenerRunnerProviderConfig.getMaxPollRecords() > 0) {
                    containerProps.getKafkaConsumerProperties().put("max.poll.records", (Object)taskListenerRunnerProviderConfig.getMaxPollRecords());
                }
                containerProps.getKafkaConsumerProperties().put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
                ConcurrentHashMap<String, ScheduledThreadPoolExecutor> heartbeatThread = new ConcurrentHashMap<String, ScheduledThreadPoolExecutor>();
                containerProps.setMessageListener((Object)new PartitionSeekOffsetMessageListener(jobType, taskListenerRunnerProviderConfig, containerId, consumerGroup, taskListener, heartbeatThread));
                TopicPartitions topicPartitions = new TopicPartitions();
                ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(TaskListenerRunnerViaKafkaProvider.this.consumerFactory, containerProps);
                container.setConcurrency(Math.max(taskListenerRunnerProviderConfig.getConcurrency(), 1));
                container.setApplicationEventPublisher(event -> {});
                container.start();
                topicPartitions.setConcurrentMessageListenerContainer((ConcurrentMessageListenerContainer<String, String>)container);
                topicPartitions.setHeartbeatThread(heartbeatThread);
                TaskListenerRunnerViaKafkaProvider.this.containers.put(containerId, topicPartitions);
                log.info("Started dynamic consumer for topic: " + topic + ", group: " + consumerGroup + ", concurrency: " + container.getConcurrency());
                return topicPartitions;
            }
        });
        return topicPartitions != null;
    }

    public void close() {
        for (TopicPartitions topicPartitions : this.containers.values()) {
            try {
                ConcurrentMessageListenerContainer<String, String> container = topicPartitions.getConcurrentMessageListenerContainer();
                container.stop();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    public void run(String ... args) throws Exception {
        ShutdownManager.getOrCreate().register(TaskListenerRunnerViaKafkaProvider.class.getName(), new Util.CatchRunnable(){

            public void run() throws Exception {
                TaskListenerRunnerViaKafkaProvider.this.close();
            }
        });
        log.info("register shutdown hook, jobhub kafka listener");
    }

    TopicPartitionContext get(String containerId, String p) {
        String k = "TopicPartitionContext:" + containerId + ":" + p;
        KVEngine.GetResult getResult = ttlFunc.get((Comparable)((Object)k));
        if (!getResult.exists()) {
            return null;
        }
        return (TopicPartitionContext)getResult.value();
    }

    static class TopicPartitions {
        ConcurrentMessageListenerContainer<String, String> concurrentMessageListenerContainer;
        private Map<String, ScheduledThreadPoolExecutor> heartbeatThread = new ConcurrentHashMap<String, ScheduledThreadPoolExecutor>();

        public ConcurrentMessageListenerContainer<String, String> getConcurrentMessageListenerContainer() {
            return this.concurrentMessageListenerContainer;
        }

        public Map<String, ScheduledThreadPoolExecutor> getHeartbeatThread() {
            return this.heartbeatThread;
        }

        public void setConcurrentMessageListenerContainer(ConcurrentMessageListenerContainer<String, String> concurrentMessageListenerContainer) {
            this.concurrentMessageListenerContainer = concurrentMessageListenerContainer;
        }

        public void setHeartbeatThread(Map<String, ScheduledThreadPoolExecutor> heartbeatThread) {
            this.heartbeatThread = heartbeatThread;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TopicPartitions)) {
                return false;
            }
            TopicPartitions other = (TopicPartitions)o;
            if (!other.canEqual(this)) {
                return false;
            }
            ConcurrentMessageListenerContainer<String, String> this$concurrentMessageListenerContainer = this.getConcurrentMessageListenerContainer();
            ConcurrentMessageListenerContainer<String, String> other$concurrentMessageListenerContainer = other.getConcurrentMessageListenerContainer();
            if (this$concurrentMessageListenerContainer == null ? other$concurrentMessageListenerContainer != null : !this$concurrentMessageListenerContainer.equals(other$concurrentMessageListenerContainer)) {
                return false;
            }
            Map<String, ScheduledThreadPoolExecutor> this$heartbeatThread = this.getHeartbeatThread();
            Map<String, ScheduledThreadPoolExecutor> other$heartbeatThread = other.getHeartbeatThread();
            return !(this$heartbeatThread == null ? other$heartbeatThread != null : !((Object)this$heartbeatThread).equals(other$heartbeatThread));
        }

        protected boolean canEqual(Object other) {
            return other instanceof TopicPartitions;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            ConcurrentMessageListenerContainer<String, String> $concurrentMessageListenerContainer = this.getConcurrentMessageListenerContainer();
            result = result * 59 + ($concurrentMessageListenerContainer == null ? 43 : $concurrentMessageListenerContainer.hashCode());
            Map<String, ScheduledThreadPoolExecutor> $heartbeatThread = this.getHeartbeatThread();
            result = result * 59 + ($heartbeatThread == null ? 43 : ((Object)$heartbeatThread).hashCode());
            return result;
        }

        public String toString() {
            return "TaskListenerRunnerViaKafkaProvider.TopicPartitions(concurrentMessageListenerContainer=" + String.valueOf(this.getConcurrentMessageListenerContainer()) + ", heartbeatThread=" + String.valueOf(this.getHeartbeatThread()) + ")";
        }
    }

    public static class TopicPartitionContext
    implements Model {
        static final String KEY_PREFIX = "TopicPartitionContext";
        private final String topic;
        private final int partition;
        private final String containerId;
        private final long startTime = System.currentTimeMillis();
        private String tempUniqueId;
        private transient Thread ioThread;
        transient KafkaTaskContext kafkaTaskContext;
        private Object heartbeatData;
        private String cancelledTaskIdInES;
        private ProgressStat progressStat;

        public TopicPartitionContext(String topic, int partition, String containerId) {
            this.topic = topic;
            this.partition = partition;
            this.containerId = containerId;
        }

        public String getTopic() {
            return this.topic;
        }

        public int getPartition() {
            return this.partition;
        }

        public String getContainerId() {
            return this.containerId;
        }

        public long getStartTime() {
            return this.startTime;
        }

        public String getTempUniqueId() {
            return this.tempUniqueId;
        }

        public Thread getIoThread() {
            return this.ioThread;
        }

        public KafkaTaskContext getKafkaTaskContext() {
            return this.kafkaTaskContext;
        }

        public Object getHeartbeatData() {
            return this.heartbeatData;
        }

        public String getCancelledTaskIdInES() {
            return this.cancelledTaskIdInES;
        }

        public ProgressStat getProgressStat() {
            return this.progressStat;
        }

        public void setTempUniqueId(String tempUniqueId) {
            this.tempUniqueId = tempUniqueId;
        }

        public void setIoThread(Thread ioThread) {
            this.ioThread = ioThread;
        }

        public void setKafkaTaskContext(KafkaTaskContext kafkaTaskContext) {
            this.kafkaTaskContext = kafkaTaskContext;
        }

        public void setHeartbeatData(Object heartbeatData) {
            this.heartbeatData = heartbeatData;
        }

        public void setCancelledTaskIdInES(String cancelledTaskIdInES) {
            this.cancelledTaskIdInES = cancelledTaskIdInES;
        }

        public void setProgressStat(ProgressStat progressStat) {
            this.progressStat = progressStat;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TopicPartitionContext)) {
                return false;
            }
            TopicPartitionContext other = (TopicPartitionContext)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getPartition() != other.getPartition()) {
                return false;
            }
            if (this.getStartTime() != other.getStartTime()) {
                return false;
            }
            String this$topic = this.getTopic();
            String other$topic = other.getTopic();
            if (this$topic == null ? other$topic != null : !this$topic.equals(other$topic)) {
                return false;
            }
            String this$containerId = this.getContainerId();
            String other$containerId = other.getContainerId();
            if (this$containerId == null ? other$containerId != null : !this$containerId.equals(other$containerId)) {
                return false;
            }
            String this$tempUniqueId = this.getTempUniqueId();
            String other$tempUniqueId = other.getTempUniqueId();
            if (this$tempUniqueId == null ? other$tempUniqueId != null : !this$tempUniqueId.equals(other$tempUniqueId)) {
                return false;
            }
            Object this$heartbeatData = this.getHeartbeatData();
            Object other$heartbeatData = other.getHeartbeatData();
            if (this$heartbeatData == null ? other$heartbeatData != null : !this$heartbeatData.equals(other$heartbeatData)) {
                return false;
            }
            String this$cancelledTaskIdInES = this.getCancelledTaskIdInES();
            String other$cancelledTaskIdInES = other.getCancelledTaskIdInES();
            if (this$cancelledTaskIdInES == null ? other$cancelledTaskIdInES != null : !this$cancelledTaskIdInES.equals(other$cancelledTaskIdInES)) {
                return false;
            }
            ProgressStat this$progressStat = this.getProgressStat();
            ProgressStat other$progressStat = other.getProgressStat();
            return !(this$progressStat == null ? other$progressStat != null : !((Object)this$progressStat).equals(other$progressStat));
        }

        protected boolean canEqual(Object other) {
            return other instanceof TopicPartitionContext;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getPartition();
            long $startTime = this.getStartTime();
            result = result * 59 + (int)($startTime >>> 32 ^ $startTime);
            String $topic = this.getTopic();
            result = result * 59 + ($topic == null ? 43 : $topic.hashCode());
            String $containerId = this.getContainerId();
            result = result * 59 + ($containerId == null ? 43 : $containerId.hashCode());
            String $tempUniqueId = this.getTempUniqueId();
            result = result * 59 + ($tempUniqueId == null ? 43 : $tempUniqueId.hashCode());
            Object $heartbeatData = this.getHeartbeatData();
            result = result * 59 + ($heartbeatData == null ? 43 : $heartbeatData.hashCode());
            String $cancelledTaskIdInES = this.getCancelledTaskIdInES();
            result = result * 59 + ($cancelledTaskIdInES == null ? 43 : $cancelledTaskIdInES.hashCode());
            ProgressStat $progressStat = this.getProgressStat();
            result = result * 59 + ($progressStat == null ? 43 : ((Object)$progressStat).hashCode());
            return result;
        }

        public String toString() {
            return "TaskListenerRunnerViaKafkaProvider.TopicPartitionContext(topic=" + this.getTopic() + ", partition=" + this.getPartition() + ", containerId=" + this.getContainerId() + ", startTime=" + this.getStartTime() + ", tempUniqueId=" + this.getTempUniqueId() + ", ioThread=" + String.valueOf(this.getIoThread()) + ", kafkaTaskContext=" + String.valueOf((Object)this.getKafkaTaskContext()) + ", heartbeatData=" + String.valueOf(this.getHeartbeatData()) + ", cancelledTaskIdInES=" + this.getCancelledTaskIdInES() + ", progressStat=" + String.valueOf(this.getProgressStat()) + ")";
        }
    }

    public static class ProgressStat
    implements Model {
        private long startOffset;
        private long taskOffset;
        private HeavyPerTopicPartitionTaskListener.SubProgressStat subProgressStat;

        public long getStartOffset() {
            return this.startOffset;
        }

        public long getTaskOffset() {
            return this.taskOffset;
        }

        public HeavyPerTopicPartitionTaskListener.SubProgressStat getSubProgressStat() {
            return this.subProgressStat;
        }

        public void setStartOffset(long startOffset) {
            this.startOffset = startOffset;
        }

        public void setTaskOffset(long taskOffset) {
            this.taskOffset = taskOffset;
        }

        public void setSubProgressStat(HeavyPerTopicPartitionTaskListener.SubProgressStat subProgressStat) {
            this.subProgressStat = subProgressStat;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ProgressStat)) {
                return false;
            }
            ProgressStat other = (ProgressStat)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getStartOffset() != other.getStartOffset()) {
                return false;
            }
            if (this.getTaskOffset() != other.getTaskOffset()) {
                return false;
            }
            HeavyPerTopicPartitionTaskListener.SubProgressStat this$subProgressStat = this.getSubProgressStat();
            HeavyPerTopicPartitionTaskListener.SubProgressStat other$subProgressStat = other.getSubProgressStat();
            return !(this$subProgressStat == null ? other$subProgressStat != null : !((Object)this$subProgressStat).equals(other$subProgressStat));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ProgressStat;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $startOffset = this.getStartOffset();
            result = result * 59 + (int)($startOffset >>> 32 ^ $startOffset);
            long $taskOffset = this.getTaskOffset();
            result = result * 59 + (int)($taskOffset >>> 32 ^ $taskOffset);
            HeavyPerTopicPartitionTaskListener.SubProgressStat $subProgressStat = this.getSubProgressStat();
            result = result * 59 + ($subProgressStat == null ? 43 : ((Object)$subProgressStat).hashCode());
            return result;
        }

        public String toString() {
            return "TaskListenerRunnerViaKafkaProvider.ProgressStat(startOffset=" + this.getStartOffset() + ", taskOffset=" + this.getTaskOffset() + ", subProgressStat=" + String.valueOf(this.getSubProgressStat()) + ")";
        }
    }

    public static class KafkaTaskContext
    extends JobServiceImpl.TaskContextImpl
    implements JobService.TaskContext {
        private final HeavyPerTopicPartitionTaskListener.SubProgressStat subProgressStat = new HeavyPerTopicPartitionTaskListener.SubProgressStat();

        public HeavyPerTopicPartitionTaskListener.SubProgressStat getSubProgressStat() {
            return this.subProgressStat;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof KafkaTaskContext)) {
                return false;
            }
            KafkaTaskContext other = (KafkaTaskContext)((Object)o);
            if (!other.canEqual((Object)this)) {
                return false;
            }
            HeavyPerTopicPartitionTaskListener.SubProgressStat this$subProgressStat = this.getSubProgressStat();
            HeavyPerTopicPartitionTaskListener.SubProgressStat other$subProgressStat = other.getSubProgressStat();
            return !(this$subProgressStat == null ? other$subProgressStat != null : !((Object)this$subProgressStat).equals(other$subProgressStat));
        }

        protected boolean canEqual(Object other) {
            return other instanceof KafkaTaskContext;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            HeavyPerTopicPartitionTaskListener.SubProgressStat $subProgressStat = this.getSubProgressStat();
            result = result * 59 + ($subProgressStat == null ? 43 : ((Object)$subProgressStat).hashCode());
            return result;
        }

        public String toString() {
            return "TaskListenerRunnerViaKafkaProvider.KafkaTaskContext(subProgressStat=" + String.valueOf(this.getSubProgressStat()) + ")";
        }
    }

    class PartitionSeekOffsetMessageListener
    implements AcknowledgingMessageListener<String, String>,
    ConsumerSeekAware {
        final String jobType;
        final String containerId;
        private final String consumerGroup;
        final JobService.TaskListener taskListener;
        final boolean autoCommit;
        final Map<String, ScheduledThreadPoolExecutor> heartbeatThread;
        private final boolean ignoreTaskIfDone;
        final boolean remoteLogEnabled;

        public PartitionSeekOffsetMessageListener(String jobType, JobService.TaskListenerRunnerProviderConfig taskListenerRunnerProviderConfig, String containerId, String consumerGroup, JobService.TaskListener taskListener, Map<String, ScheduledThreadPoolExecutor> heartbeatThread) {
            this.jobType = jobType;
            this.containerId = containerId;
            this.consumerGroup = consumerGroup;
            this.taskListener = taskListener;
            this.autoCommit = taskListenerRunnerProviderConfig.isAutoCommit();
            this.heartbeatThread = heartbeatThread;
            this.ignoreTaskIfDone = taskListenerRunnerProviderConfig.isIgnoreTaskIfDone();
            this.remoteLogEnabled = taskListenerRunnerProviderConfig.isRemoteLogEnabled();
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            for (TopicPartition partition : partitions) {
                String t = partition.topic();
                int p = partition.partition();
                log.info("partition revoked : " + t + ":" + p);
                String k = "TopicPartitionContext:" + this.containerId + ":" + p;
                ttlFunc.putAndGet((Comparable)((Object)k), getResult -> {
                    if (!getResult.exists()) {
                        log.info(t + ":" + p + " ,  revoked ,partition cache is missing , ignore , not assigned???: " + k);
                        return null;
                    }
                    TopicPartitionContext topicPartitionContext = (TopicPartitionContext)getResult.value();
                    if (topicPartitionContext.getIoThread() == Thread.currentThread()) {
                        topicPartitionContext.setIoThread(null);
                        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = this.heartbeatThread.get(k);
                        try {
                            if (scheduledThreadPoolExecutor != null) {
                                scheduledThreadPoolExecutor.shutdown();
                            }
                            log.info("partition revoked, close heartbeat thread: " + t + ":" + p);
                        }
                        catch (Exception e) {
                            log.error(e.getMessage(), (Throwable)e);
                        }
                    }
                    return topicPartitionContext;
                });
            }
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            for (TopicPartition partition : assignments.keySet()) {
                String t = partition.topic();
                final int p = partition.partition();
                log.info("partition assigned : " + t + ":" + p);
                final String k = "TopicPartitionContext:" + this.containerId + ":" + p;
                ttlFunc.putAndGet((Comparable)((Object)k), getResult -> {
                    TopicPartitionContext topicPartitionContext = (TopicPartitionContext)getResult.value();
                    if (!getResult.exists() || getResult.value() == null) {
                        topicPartitionContext = new TopicPartitionContext(t, p, this.containerId);
                    }
                    Thread beforeIoThread = topicPartitionContext.getIoThread();
                    topicPartitionContext.setIoThread(Thread.currentThread());
                    if (beforeIoThread != Thread.currentThread()) {
                        topicPartitionContext.setTempUniqueId(Util.uniqueFirstPart());
                        topicPartitionContext.setProgressStat(new ProgressStat());
                        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = this.heartbeatThread.get(k);
                        try {
                            if (scheduledThreadPoolExecutor != null) {
                                scheduledThreadPoolExecutor.shutdown();
                            }
                            log.info("partition assigned, first close heartbeat thread: " + t + ":" + p);
                        }
                        catch (Exception e) {
                            log.error(e.getMessage(), (Throwable)e);
                        }
                        log.info("partition assigned, to start heartbeat thread: " + t + ":" + p);
                        ScheduledThreadPoolExecutor taskHeartbeat = new ScheduledThreadPoolExecutor(1, Util.newThreadFactory((String)"tmp"), new ThreadPoolExecutor.CallerRunsPolicy());
                        taskHeartbeat.setKeepAliveTime(60L, TimeUnit.SECONDS);
                        taskHeartbeat.allowCoreThreadTimeOut(true);
                        final TopicPartitionContext finalTopicPartitionContext = topicPartitionContext;
                        Runnable runnable = Util.catchRunnable((Util.CatchRunnable)new Util.CatchRunnable(){

                            public void run() throws Exception {
                                ClientNode clientNode = TaskListenerRunnerViaKafkaProvider.this.clientNodeProvider.clientNode();
                                KafkaTaskContext kafkaTaskContext = finalTopicPartitionContext.kafkaTaskContext;
                                if (kafkaTaskContext == null) {
                                    return;
                                }
                                HeavyPerTopicPartitionTaskListener.SubProgressStat subProgressStat = kafkaTaskContext.getSubProgressStat();
                                TaskGetResponse.Task task = kafkaTaskContext.task();
                                TaskHeartbeatRequest taskHeartbeatRequest = new TaskHeartbeatRequest();
                                taskHeartbeatRequest.setTaskIdInES(task.getId());
                                taskHeartbeatRequest.setJobIdInES(task.getJobId());
                                taskHeartbeatRequest.setApp(clientNode.app());
                                taskHeartbeatRequest.setNode(ClientNode.UUID_STR);
                                taskHeartbeatRequest.setVcc(vcc.incrementAndGet());
                                taskHeartbeatRequest.setContainerId(PartitionSeekOffsetMessageListener.this.containerId);
                                taskHeartbeatRequest.setPartition(p);
                                taskHeartbeatRequest.setConsumerGroup(PartitionSeekOffsetMessageListener.this.consumerGroup);
                                taskHeartbeatRequest.setTempUniqueId(finalTopicPartitionContext.getTempUniqueId());
                                taskHeartbeatRequest.setTaskStartTimeStr(kafkaTaskContext.getTaskStartTimeStr());
                                taskHeartbeatRequest.setTask((Object)task);
                                taskHeartbeatRequest.setHeartbeatData(finalTopicPartitionContext.getHeartbeatData());
                                ProgressStat progressStat = new ProgressStat();
                                BeanUtils.copyProperties((Object)finalTopicPartitionContext.getProgressStat(), (Object)progressStat);
                                progressStat.setSubProgressStat(subProgressStat);
                                taskHeartbeatRequest.setProgressStat((Object)progressStat);
                                try {
                                    log.info(k + ", heartbeat, request " + JSONAccessor.impl().format((Object)taskHeartbeatRequest));
                                    TaskHeartbeatResponse taskHeartbeatResponse = TaskListenerRunnerViaKafkaProvider.this.client2ControlTransport.heartbeatStat(taskHeartbeatRequest);
                                    if (taskHeartbeatResponse == null) {
                                        finalTopicPartitionContext.setCancelledTaskIdInES(null);
                                    } else {
                                        finalTopicPartitionContext.setCancelledTaskIdInES(taskHeartbeatResponse.getCancelledTaskIdInES());
                                    }
                                    log.info(k + ", heartbeat, response " + JSONAccessor.impl().format((Object)taskHeartbeatResponse));
                                }
                                catch (Exception e) {
                                    log.error(e.getMessage(), (Throwable)e);
                                }
                            }
                        });
                        taskHeartbeat.scheduleWithFixedDelay(runnable, 0L, 5L, TimeUnit.SECONDS);
                        this.heartbeatThread.put(k, taskHeartbeat);
                        log.info("partition assigned,start heartbeat thread: " + t + ":" + p);
                    }
                    return topicPartitionContext;
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
            String t = record.topic();
            int p = record.partition();
            TopicPartitionContext topicPartitionContext = (TopicPartitionContext)ttlFunc.get((Comparable)((Object)("TopicPartitionContext:" + this.containerId + ":" + p))).value();
            try {
                ProgressStat progressStat = topicPartitionContext.getProgressStat();
                if (progressStat.getStartOffset() == 0L) {
                    progressStat.setStartOffset(record.offset());
                }
                progressStat.setTaskOffset(record.offset());
                String value = (String)record.value();
                TaskGetResponse.Task task = (TaskGetResponse.Task)JSONAccessor.impl().read(value, TaskGetResponse.Task.class);
                String taskKey = "_job4kafka_" + task.getJobId() + "_task_" + task.getId();
                JobLogImpl jobLog = this.remoteLogEnabled ? new JobLogImpl(TaskListenerRunnerViaKafkaProvider.this.client2ControlTransport, taskKey, 0L) : JobLog.noopIfNull(null);
                KafkaTaskContext kafkaTaskContext = new KafkaTaskContext();
                kafkaTaskContext.setTask(task);
                kafkaTaskContext.setHeartbeatDataListener(topicPartitionContext::setHeartbeatData);
                kafkaTaskContext.setCancelledSupplier((Supplier)((ServiceProvider)() -> Util.compare2((Comparable)((Object)task.getId()), (Comparable)((Object)topicPartitionContext.getCancelledTaskIdInES())) == 0));
                kafkaTaskContext.setJobLog((JobLog)jobLog);
                topicPartitionContext.setKafkaTaskContext(kafkaTaskContext);
                if (this.ignoreTaskIfDone) {
                    TaskIfDoneRequest taskIfDoneRequest = new TaskIfDoneRequest();
                    taskIfDoneRequest.setTaskIdInES(task.getId());
                    taskIfDoneRequest.setConsumerGroup(this.consumerGroup);
                    TaskIfDoneResponse taskIfDoneResponse = TaskListenerRunnerViaKafkaProvider.this.client2ControlTransport.taskIfDone(taskIfDoneRequest);
                    if (taskIfDoneResponse != null && taskIfDoneResponse.isDone()) {
                        log.info("cancel task: " + JSONAccessor.impl().format((Object)taskIfDoneResponse));
                        return;
                    }
                }
                TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest();
                taskUpdateRequest.setId(task.getId());
                MDC.put((String)"traceId", (String)taskKey);
                MDC.put((String)"requestId", (String)taskKey);
                long startMs = System.currentTimeMillis();
                JobStatus jobStatus = JobStatus.COMPLETED;
                try {
                    this.taskListener.on((JobService.TaskContext)kafkaTaskContext);
                    jobStatus = (JobStatus)Util.convert2Self((Object)kafkaTaskContext.getJobStatus(), (Object)JobStatus.COMPLETED);
                }
                catch (Exception e) {
                    log.error(e.getMessage(), (Throwable)e);
                    jobStatus = JobStatus.FAIL;
                    taskUpdateRequest.setCompletedDesc(e.getMessage());
                }
                finally {
                    if (!kafkaTaskContext.isStatusManageManually()) {
                        try {
                            taskUpdateRequest.setId(task.getId());
                            taskUpdateRequest.setDocIndexName(task.getDocIndexName());
                            taskUpdateRequest.setStatus(jobStatus.name());
                            if (Util.isNotEmpty((CharSequence)kafkaTaskContext.getCompletedDesc())) {
                                taskUpdateRequest.setCompletedDesc(kafkaTaskContext.getCompletedDesc());
                            }
                            taskUpdateRequest.setRequestDeviceUrl(kafkaTaskContext.getRequestDeviceUrl());
                            taskUpdateRequest.setRequestDeviceArgs(kafkaTaskContext.getRequestDeviceArgs());
                            taskUpdateRequest.setResponseFromDevice(kafkaTaskContext.getResponseFromDevice());
                            taskUpdateRequest.setRetryCount(kafkaTaskContext.task().getRetryCount());
                            TaskUpdateResponse taskUpdateResponse = TaskListenerRunnerViaKafkaProvider.this.jobService.taskUpdate(taskUpdateRequest);
                            log.info("taskUpdate: " + JSONAccessor.impl().format((Object)taskUpdateResponse));
                        }
                        catch (Exception e) {
                            log.error(e.getMessage(), (Throwable)e);
                        }
                    }
                    try {
                        kafkaTaskContext.append0(taskKey + " , cost time: " + Util.costTime((long)(System.currentTimeMillis() - startMs)) + ", start: " + Util.formatTime((LocalDateTime)Util.dateTime((long)startMs), (String[])new String[0]));
                        List contentList = kafkaTaskContext.getContentList();
                        TaskLogPutRequest taskLogPutRequest = new TaskLogPutRequest();
                        taskLogPutRequest.setJobId(task.getJobId());
                        taskLogPutRequest.setTaskId(task.getId());
                        taskLogPutRequest.setType(this.jobType);
                        taskLogPutRequest.setContentList(contentList);
                        TaskLogPutResponse taskLogPutResponse = TaskListenerRunnerViaKafkaProvider.this.jobService.taskLog(taskLogPutRequest);
                        log.info("taskLog: " + JSONAccessor.impl().format((Object)taskLogPutResponse));
                    }
                    catch (Exception e) {
                        log.error(e.getMessage(), (Throwable)e);
                    }
                    try {
                        jobLog.flush(true);
                    }
                    catch (Exception e) {
                        log.error(e.getMessage(), (Throwable)e);
                    }
                    log.info(taskKey + " , cost time: " + Util.costTime((long)(System.currentTimeMillis() - startMs)) + ", start: " + Util.formatTime((LocalDateTime)Util.dateTime((long)startMs), (String[])new String[0]));
                }
            }
            catch (Exception e) {
                log.error("Error processing record from topic: {}", (Object)t, (Object)e);
            }
            finally {
                topicPartitionContext.setKafkaTaskContext(null);
                if (!this.autoCommit) {
                    acknowledgment.acknowledge();
                }
                MDC.remove((String)"traceId");
                MDC.remove((String)"requestId");
            }
        }
    }
}

