/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.jobhub.sdk.client.kafka;

import com.ovopark.jobhub.sdk.client.Client2ControlTransport;
import com.ovopark.jobhub.sdk.client.ClientNodeProvider;
import com.ovopark.jobhub.sdk.client.JobService;
import com.ovopark.jobhub.sdk.client.JobServiceImpl;
import com.ovopark.jobhub.sdk.model.JobStatus;
import com.ovopark.jobhub.sdk.model.TaskGetResponse;
import com.ovopark.jobhub.sdk.model.TaskHeartbeatRequest;
import com.ovopark.jobhub.sdk.model.TaskHeartbeatResponse;
import com.ovopark.jobhub.sdk.model.TaskLogPutRequest;
import com.ovopark.jobhub.sdk.model.TaskLogPutResponse;
import com.ovopark.jobhub.sdk.model.TaskMetaGetRequest;
import com.ovopark.jobhub.sdk.model.TaskMetaGetResponse;
import com.ovopark.jobhub.sdk.model.TaskUpdateRequest;
import com.ovopark.jobhub.sdk.model.TaskUpdateResponse;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.Model;
import com.ovopark.kernel.shared.ShutdownManager;
import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.kv.KVEngine;
import com.ovopark.kernel.shared.vclient.ClientNode;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class TaskListenerRunnerViaKafkaProvider
implements JobService.TaskListenerRunnerProvider,
CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(TaskListenerRunnerViaKafkaProvider.class);
    @Qualifier(value="com.ovopark.jobhub.sdk.client.kafka.KafkaConfig.consumerFactory")
    @Autowired
    ConsumerFactory<String, String> consumerFactory;
    private final Map<String, TopicPartitions> containers = new ConcurrentHashMap<String, TopicPartitions>();
    @Autowired
    private JobService jobService;
    @Autowired
    private Client2ControlTransport client2ControlTransport;
    @Autowired
    private ClientNodeProvider clientNodeProvider;
    static final KVEngine.TtlFunc<String> ttlFunc = KVEngine.newTtl((String)"t-p-heartbeat-ttl");
    static final AtomicLong vcc = new AtomicLong();
    final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(Util.newThreadFactory((String)"create-kafka-consumer"));

    public String name() {
        return "kafka";
    }

    public boolean start(final String jobType, final String beanUrl, final String group, final Long minVer, final JobService.TaskListener taskListener, final JobService.TaskListenerRunnerProviderConfig taskListenerRunnerProviderConfig) {
        boolean f = this.start0(jobType, beanUrl, group, minVer, taskListener, taskListenerRunnerProviderConfig);
        if (!f) {
            log.warn("cannot start consumer, schedule a delay task: " + beanUrl + ":" + jobType + ":" + group);
            this.scheduledExecutorService.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        boolean f = TaskListenerRunnerViaKafkaProvider.this.start0(jobType, beanUrl, group, minVer, taskListener, taskListenerRunnerProviderConfig);
                        if (!f) {
                            TaskListenerRunnerViaKafkaProvider.this.scheduledExecutorService.schedule(this, 10L, TimeUnit.SECONDS);
                            log.warn("cannot start consumer, schedule a delay task: " + beanUrl + ":" + jobType + ":" + group);
                        }
                    }
                    catch (Exception e) {
                        log.error(e.getMessage(), (Throwable)e);
                        TaskListenerRunnerViaKafkaProvider.this.scheduledExecutorService.schedule(this, 10L, TimeUnit.SECONDS);
                        log.warn("cannot start consumer, schedule a delay task: " + beanUrl + ":" + jobType + ":" + group);
                    }
                }
            }, 10L, TimeUnit.SECONDS);
        }
        return f;
    }

    private boolean start0(final String jobType, String beanUrl, final String group, Long minVer, final JobService.TaskListener taskListener, final JobService.TaskListenerRunnerProviderConfig taskListenerRunnerProviderConfig) {
        final String containerId = beanUrl + ":" + jobType + ":" + group;
        TopicPartitions topicPartitions = (TopicPartitions)Util.lock((Comparable)((Object)containerId), (Callable)new Callable<TopicPartitions>(){

            @Override
            public TopicPartitions call() throws Exception {
                TopicPartitions tps = TaskListenerRunnerViaKafkaProvider.this.containers.get(containerId);
                if (tps != null) {
                    return tps;
                }
                TaskMetaGetRequest taskMetaGetRequest = new TaskMetaGetRequest();
                taskMetaGetRequest.setType(jobType);
                taskMetaGetRequest.setGroup(group);
                TaskMetaGetResponse taskMetaGetResponse = TaskListenerRunnerViaKafkaProvider.this.jobService.taskMetaGet(taskMetaGetRequest);
                if (taskMetaGetResponse == null || Util.isEmpty((CharSequence)taskMetaGetResponse.getTopic())) {
                    log.info("cannot find topic " + JSONAccessor.impl().format((Object)taskMetaGetResponse) + ", request: " + JSONAccessor.impl().format((Object)taskMetaGetRequest));
                    return null;
                }
                final String topic = taskMetaGetResponse.getTopic();
                String groupId = Util.isEmpty((CharSequence)taskListenerRunnerProviderConfig.getConsumerGroup()) ? containerId : taskListenerRunnerProviderConfig.getConsumerGroup();
                ContainerProperties containerProps = new ContainerProperties(new String[]{topic});
                containerProps.setGroupId(groupId);
                if (Util.isNotEmpty((CharSequence)taskListenerRunnerProviderConfig.getOffsetReset())) {
                    containerProps.getKafkaConsumerProperties().put("auto.offset.reset", taskListenerRunnerProviderConfig.getOffsetReset());
                }
                final boolean autoCommit = taskListenerRunnerProviderConfig.isAutoCommit();
                containerProps.getKafkaConsumerProperties().put("enable.auto.commit", (Object)autoCommit);
                if (!autoCommit) {
                    containerProps.setAckMode(ContainerProperties.AckMode.MANUAL);
                }
                if (taskListenerRunnerProviderConfig.getPollIntervalTimeSec() > 0) {
                    containerProps.getKafkaConsumerProperties().put("max.poll.interval.ms", (Object)(taskListenerRunnerProviderConfig.getPollIntervalTimeSec() * 1000));
                }
                if (taskListenerRunnerProviderConfig.getMaxPollRecords() > 0) {
                    containerProps.getKafkaConsumerProperties().put("max.poll.records", (Object)taskListenerRunnerProviderConfig.getMaxPollRecords());
                }
                final ConcurrentHashMap<String, ScheduledThreadPoolExecutor> heartbeatThread = new ConcurrentHashMap<String, ScheduledThreadPoolExecutor>();
                containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener(){

                    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        for (TopicPartition partition : partitions) {
                            String t = partition.topic();
                            int p = partition.partition();
                            String key = 1.key0(t, p);
                            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)heartbeatThread.get(key);
                            try {
                                if (scheduledThreadPoolExecutor != null) {
                                    scheduledThreadPoolExecutor.shutdown();
                                }
                                log.info("partition revoked, close heartbeat thread: " + t + ":" + p);
                            }
                            catch (Exception e) {
                                log.error(e.getMessage(), (Throwable)e);
                            }
                        }
                    }

                    private static String key0(String t, int p) {
                        return t + ":" + p;
                    }

                    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                        for (TopicPartition partition : partitions) {
                            final String t = partition.topic();
                            final int p = partition.partition();
                            String key = 1.key0(t, p);
                            ScheduledThreadPoolExecutor taskHeartbeat = new ScheduledThreadPoolExecutor(1, Util.newThreadFactory((String)"tmp"), new ThreadPoolExecutor.CallerRunsPolicy());
                            taskHeartbeat.setKeepAliveTime(60L, TimeUnit.SECONDS);
                            taskHeartbeat.allowCoreThreadTimeOut(true);
                            Runnable runnable = Util.catchRunnable((Util.CatchRunnable)new Util.CatchRunnable(){

                                public void run() throws Exception {
                                    String k = "RecordInfo:" + t + ":" + p;
                                    KVEngine.GetResult getResult = ttlFunc.get((Comparable)((Object)k));
                                    if (!getResult.exists()) {
                                        return;
                                    }
                                    ClientNode clientNode = TaskListenerRunnerViaKafkaProvider.this.clientNodeProvider.clientNode();
                                    RecordInfo recordInfo = (RecordInfo)getResult.value();
                                    TaskGetResponse.Task task = recordInfo.getTask();
                                    if (task == null) {
                                        return;
                                    }
                                    TaskHeartbeatRequest taskHeartbeatRequest = new TaskHeartbeatRequest();
                                    taskHeartbeatRequest.setTaskIdInES(task.getId());
                                    taskHeartbeatRequest.setJobIdInES(task.getJobId());
                                    taskHeartbeatRequest.setApp(clientNode.app());
                                    taskHeartbeatRequest.setNode(ClientNode.UUID_STR);
                                    taskHeartbeatRequest.setVcc(vcc.incrementAndGet());
                                    taskHeartbeatRequest.setTask((Object)task);
                                    taskHeartbeatRequest.setHeartbeatData(recordInfo.getHeartbeatData());
                                    try {
                                        log.info(k + ", heartbeat, request " + JSONAccessor.impl().format((Object)taskHeartbeatRequest));
                                        TaskHeartbeatResponse taskHeartbeatResponse = TaskListenerRunnerViaKafkaProvider.this.client2ControlTransport.heartbeatStat(taskHeartbeatRequest);
                                        log.info(k + ", heartbeat, response " + JSONAccessor.impl().format((Object)taskHeartbeatResponse));
                                    }
                                    catch (Exception e) {
                                        log.error(e.getMessage(), (Throwable)e);
                                    }
                                }
                            });
                            taskHeartbeat.scheduleWithFixedDelay(runnable, 0L, 5L, TimeUnit.SECONDS);
                            heartbeatThread.put(key, taskHeartbeat);
                            log.info("partition assigned,start heartbeat thread: " + t + ":" + p);
                        }
                    }
                });
                containerProps.setMessageListener((Object)new AcknowledgingMessageListener<String, String>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
                        String t = record.topic();
                        int p = record.partition();
                        try {
                            String value = (String)record.value();
                            TaskGetResponse.Task task = (TaskGetResponse.Task)JSONAccessor.impl().read(value, TaskGetResponse.Task.class);
                            RecordInfo ri = (RecordInfo)ttlFunc.putAndGet((Comparable)((Object)("RecordInfo:" + t + ":" + p)), getResult -> {
                                if (!getResult.exists()) {
                                    RecordInfo recordInfo = new RecordInfo();
                                    recordInfo.setTask(task);
                                    return recordInfo;
                                }
                                RecordInfo recordInfo = (RecordInfo)getResult.value();
                                recordInfo.setTask(task);
                                return recordInfo;
                            }).value();
                            TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest();
                            taskUpdateRequest.setId(task.getId());
                            JobServiceImpl.TaskContextImpl taskContext = new JobServiceImpl.TaskContextImpl();
                            taskContext.setTask(task);
                            taskContext.setHeartbeatDataListener(ri::setHeartbeatData);
                            String taskKey = "_job4kafka_" + task.getJobId() + "_task_" + task.getId();
                            MDC.put((String)"traceId", (String)taskKey);
                            MDC.put((String)"requestId", (String)taskKey);
                            JobStatus jobStatus = JobStatus.COMPLETED;
                            try {
                                taskListener.on((JobService.TaskContext)taskContext);
                                jobStatus = (JobStatus)Util.convert2Self((Object)taskContext.getJobStatus(), (Object)JobStatus.COMPLETED);
                            }
                            catch (Exception e) {
                                log.error(e.getMessage(), (Throwable)e);
                                jobStatus = JobStatus.FAIL;
                                taskUpdateRequest.setCompletedDesc(e.getMessage());
                            }
                            finally {
                                if (!taskContext.isStatusManageManually()) {
                                    taskUpdateRequest.setId(task.getId());
                                    taskUpdateRequest.setDocIndexName(task.getDocIndexName());
                                    taskUpdateRequest.setStatus(jobStatus.name());
                                    if (Util.isNotEmpty((CharSequence)taskContext.getCompletedDesc())) {
                                        taskUpdateRequest.setCompletedDesc(taskContext.getCompletedDesc());
                                    }
                                    taskUpdateRequest.setRequestDeviceUrl(taskContext.getRequestDeviceUrl());
                                    taskUpdateRequest.setRequestDeviceArgs(taskContext.getRequestDeviceArgs());
                                    taskUpdateRequest.setResponseFromDevice(taskContext.getResponseFromDevice());
                                    taskUpdateRequest.setRetryCount(taskContext.task().getRetryCount());
                                    TaskUpdateResponse taskUpdateResponse = TaskListenerRunnerViaKafkaProvider.this.jobService.taskUpdate(taskUpdateRequest);
                                    log.info("taskUpdate: " + JSONAccessor.impl().format((Object)taskUpdateResponse));
                                }
                                List contentList = taskContext.getContentList();
                                TaskLogPutRequest taskLogPutRequest = new TaskLogPutRequest();
                                taskLogPutRequest.setJobId(task.getJobId());
                                taskLogPutRequest.setTaskId(task.getId());
                                taskLogPutRequest.setType(jobType);
                                taskLogPutRequest.setContentList(contentList);
                                TaskLogPutResponse taskLogPutResponse = TaskListenerRunnerViaKafkaProvider.this.jobService.taskLog(taskLogPutRequest);
                                log.info("taskLog: " + JSONAccessor.impl().format((Object)taskLogPutResponse));
                            }
                        }
                        catch (Exception e) {
                            log.error("Error processing record from topic: {}", (Object)topic, (Object)e);
                        }
                        finally {
                            ttlFunc.putAndGet((Comparable)((Object)("RecordInfo:" + t + ":" + p)), getResult -> {
                                RecordInfo recordInfo = (RecordInfo)getResult.value();
                                recordInfo.setTask(null);
                                return recordInfo;
                            });
                            if (!autoCommit) {
                                acknowledgment.acknowledge();
                            }
                            MDC.remove((String)"traceId");
                            MDC.remove((String)"requestId");
                        }
                    }
                });
                TopicPartitions topicPartitions = new TopicPartitions();
                ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(TaskListenerRunnerViaKafkaProvider.this.consumerFactory, containerProps);
                container.setConcurrency(Math.max(taskListenerRunnerProviderConfig.getConcurrency(), 1));
                container.setApplicationEventPublisher(event -> {});
                container.start();
                topicPartitions.setConcurrentMessageListenerContainer((ConcurrentMessageListenerContainer<String, String>)container);
                topicPartitions.setHeartbeatThread(heartbeatThread);
                TaskListenerRunnerViaKafkaProvider.this.containers.put(containerId, topicPartitions);
                log.info("Started dynamic consumer for topic: " + topic + ", group: " + groupId + ", concurrency: " + container.getConcurrency());
                return topicPartitions;
            }
        });
        return topicPartitions != null;
    }

    public void close() {
        for (TopicPartitions topicPartitions : this.containers.values()) {
            try {
                ConcurrentMessageListenerContainer<String, String> container = topicPartitions.getConcurrentMessageListenerContainer();
                container.stop();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    public void run(String ... args) throws Exception {
        ShutdownManager.getOrCreate().register(TaskListenerRunnerViaKafkaProvider.class.getName(), new Util.CatchRunnable(){

            public void run() throws Exception {
                TaskListenerRunnerViaKafkaProvider.this.close();
            }
        });
        log.info("register shutdown hook, jobhub kafka listener");
    }

    static class TopicPartitions {
        ConcurrentMessageListenerContainer<String, String> concurrentMessageListenerContainer;
        private Map<String, ScheduledThreadPoolExecutor> heartbeatThread = new ConcurrentHashMap<String, ScheduledThreadPoolExecutor>();

        public ConcurrentMessageListenerContainer<String, String> getConcurrentMessageListenerContainer() {
            return this.concurrentMessageListenerContainer;
        }

        public Map<String, ScheduledThreadPoolExecutor> getHeartbeatThread() {
            return this.heartbeatThread;
        }

        public void setConcurrentMessageListenerContainer(ConcurrentMessageListenerContainer<String, String> concurrentMessageListenerContainer) {
            this.concurrentMessageListenerContainer = concurrentMessageListenerContainer;
        }

        public void setHeartbeatThread(Map<String, ScheduledThreadPoolExecutor> heartbeatThread) {
            this.heartbeatThread = heartbeatThread;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TopicPartitions)) {
                return false;
            }
            TopicPartitions other = (TopicPartitions)o;
            if (!other.canEqual(this)) {
                return false;
            }
            ConcurrentMessageListenerContainer<String, String> this$concurrentMessageListenerContainer = this.getConcurrentMessageListenerContainer();
            ConcurrentMessageListenerContainer<String, String> other$concurrentMessageListenerContainer = other.getConcurrentMessageListenerContainer();
            if (this$concurrentMessageListenerContainer == null ? other$concurrentMessageListenerContainer != null : !this$concurrentMessageListenerContainer.equals(other$concurrentMessageListenerContainer)) {
                return false;
            }
            Map<String, ScheduledThreadPoolExecutor> this$heartbeatThread = this.getHeartbeatThread();
            Map<String, ScheduledThreadPoolExecutor> other$heartbeatThread = other.getHeartbeatThread();
            return !(this$heartbeatThread == null ? other$heartbeatThread != null : !((Object)this$heartbeatThread).equals(other$heartbeatThread));
        }

        protected boolean canEqual(Object other) {
            return other instanceof TopicPartitions;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            ConcurrentMessageListenerContainer<String, String> $concurrentMessageListenerContainer = this.getConcurrentMessageListenerContainer();
            result = result * 59 + ($concurrentMessageListenerContainer == null ? 43 : $concurrentMessageListenerContainer.hashCode());
            Map<String, ScheduledThreadPoolExecutor> $heartbeatThread = this.getHeartbeatThread();
            result = result * 59 + ($heartbeatThread == null ? 43 : ((Object)$heartbeatThread).hashCode());
            return result;
        }

        public String toString() {
            return "TaskListenerRunnerViaKafkaProvider.TopicPartitions(concurrentMessageListenerContainer=" + String.valueOf(this.getConcurrentMessageListenerContainer()) + ", heartbeatThread=" + String.valueOf(this.getHeartbeatThread()) + ")";
        }
    }

    static class RecordInfo
    implements Model {
        static final String KEY_PREFIX = "RecordInfo";
        private String topic;
        private int partition;
        TaskGetResponse.Task task;
        private Object heartbeatData;

        public String getTopic() {
            return this.topic;
        }

        public int getPartition() {
            return this.partition;
        }

        public TaskGetResponse.Task getTask() {
            return this.task;
        }

        public Object getHeartbeatData() {
            return this.heartbeatData;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public void setPartition(int partition) {
            this.partition = partition;
        }

        public void setTask(TaskGetResponse.Task task) {
            this.task = task;
        }

        public void setHeartbeatData(Object heartbeatData) {
            this.heartbeatData = heartbeatData;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof RecordInfo)) {
                return false;
            }
            RecordInfo other = (RecordInfo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getPartition() != other.getPartition()) {
                return false;
            }
            String this$topic = this.getTopic();
            String other$topic = other.getTopic();
            if (this$topic == null ? other$topic != null : !this$topic.equals(other$topic)) {
                return false;
            }
            TaskGetResponse.Task this$task = this.getTask();
            TaskGetResponse.Task other$task = other.getTask();
            if (this$task == null ? other$task != null : !this$task.equals(other$task)) {
                return false;
            }
            Object this$heartbeatData = this.getHeartbeatData();
            Object other$heartbeatData = other.getHeartbeatData();
            return !(this$heartbeatData == null ? other$heartbeatData != null : !this$heartbeatData.equals(other$heartbeatData));
        }

        protected boolean canEqual(Object other) {
            return other instanceof RecordInfo;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getPartition();
            String $topic = this.getTopic();
            result = result * 59 + ($topic == null ? 43 : $topic.hashCode());
            TaskGetResponse.Task $task = this.getTask();
            result = result * 59 + ($task == null ? 43 : $task.hashCode());
            Object $heartbeatData = this.getHeartbeatData();
            result = result * 59 + ($heartbeatData == null ? 43 : $heartbeatData.hashCode());
            return result;
        }

        public String toString() {
            return "TaskListenerRunnerViaKafkaProvider.RecordInfo(topic=" + this.getTopic() + ", partition=" + this.getPartition() + ", task=" + String.valueOf(this.getTask()) + ", heartbeatData=" + String.valueOf(this.getHeartbeatData()) + ")";
        }
    }
}

