/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.jobhub.sdk.client.kafka;

import com.ovopark.jobhub.sdk.client.JobService;
import com.ovopark.jobhub.sdk.client.JobServiceImpl;
import com.ovopark.jobhub.sdk.model.JobStatus;
import com.ovopark.jobhub.sdk.model.TaskGetResponse;
import com.ovopark.jobhub.sdk.model.TaskLogPutRequest;
import com.ovopark.jobhub.sdk.model.TaskLogPutResponse;
import com.ovopark.jobhub.sdk.model.TaskMetaGetRequest;
import com.ovopark.jobhub.sdk.model.TaskMetaGetResponse;
import com.ovopark.jobhub.sdk.model.TaskUpdateRequest;
import com.ovopark.jobhub.sdk.model.TaskUpdateResponse;
import com.ovopark.kernel.shared.JSONAccessor;
import com.ovopark.kernel.shared.ShutdownManager;
import com.ovopark.kernel.shared.Util;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;

@Component
public class TaskListenerRunnerViaKafkaProvider
implements JobService.TaskListenerRunnerProvider,
CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(TaskListenerRunnerViaKafkaProvider.class);
    @Autowired
    ConsumerFactory<String, String> consumerFactory;
    private final Map<String, ConcurrentMessageListenerContainer<String, String>> containers = new ConcurrentHashMap<String, ConcurrentMessageListenerContainer<String, String>>();

    public String name() {
        return "kafka";
    }

    public boolean start(final String jobType, String beanUrl, final String group, Long minVer, final JobService.TaskListener taskListener, final JobService jobService, final JobService.TaskListenerRunnerProviderConfig taskListenerRunnerProviderConfig) {
        final String key = beanUrl + ":" + jobType + ":" + group;
        ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer)Util.lock((Comparable)((Object)key), (Callable)new Callable<ConcurrentMessageListenerContainer<String, String>>(){

            @Override
            public ConcurrentMessageListenerContainer<String, String> call() throws Exception {
                String containerId = key;
                if (TaskListenerRunnerViaKafkaProvider.this.containers.containsKey(containerId)) {
                    throw new IllegalStateException("Consumer already running for " + containerId);
                }
                TaskMetaGetRequest taskMetaGetRequest = new TaskMetaGetRequest();
                taskMetaGetRequest.setType(jobType);
                taskMetaGetRequest.setGroup(group);
                TaskMetaGetResponse taskMetaGetResponse = jobService.taskMetaGet(taskMetaGetRequest);
                if (taskMetaGetResponse == null || Util.isEmpty((CharSequence)taskMetaGetResponse.getTopic())) {
                    throw new IllegalStateException("cannot find topic " + JSONAccessor.impl().format((Object)taskMetaGetResponse));
                }
                String topic = taskMetaGetResponse.getTopic();
                String groupId = key;
                ContainerProperties containerProps = new ContainerProperties(new String[]{topic});
                containerProps.setGroupId(groupId);
                if (Util.isNotEmpty((CharSequence)taskListenerRunnerProviderConfig.getOffsetReset())) {
                    containerProps.getKafkaConsumerProperties().put("auto.offset.reset", taskListenerRunnerProviderConfig.getOffsetReset());
                }
                containerProps.setMessageListener(record -> {
                    try {
                        String value = (String)record.value();
                        TaskGetResponse.Task task = (TaskGetResponse.Task)JSONAccessor.impl().read(value, TaskGetResponse.Task.class);
                        TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest();
                        taskUpdateRequest.setId(task.getId());
                        JobServiceImpl.TaskContextImpl taskContext = new JobServiceImpl.TaskContextImpl();
                        taskContext.setTask(task);
                        String taskKey = "_job_" + task.getJobId() + "_task_" + task.getId();
                        MDC.put((String)"traceId", (String)taskKey);
                        MDC.put((String)"requestId", (String)taskKey);
                        JobStatus jobStatus = JobStatus.COMPLETED;
                        try {
                            taskListener.on((JobService.TaskContext)taskContext);
                            jobStatus = (JobStatus)Util.convert2Self((Object)taskContext.getJobStatus(), (Object)JobStatus.COMPLETED);
                        }
                        catch (Exception e) {
                            jobStatus = JobStatus.FAIL;
                        }
                        finally {
                            if (taskContext.isStatusManageManually()) {
                                taskUpdateRequest.setId(task.getId());
                                taskUpdateRequest.setDocIndexName(task.getDocIndexName());
                                taskUpdateRequest.setStatus(jobStatus.name());
                                taskUpdateRequest.setCompletedDesc(taskContext.getCompletedDesc());
                                taskUpdateRequest.setRequestDeviceUrl(taskContext.getRequestDeviceUrl());
                                taskUpdateRequest.setRequestDeviceArgs(taskContext.getRequestDeviceArgs());
                                taskUpdateRequest.setResponseFromDevice(taskContext.getResponseFromDevice());
                                TaskUpdateResponse taskUpdateResponse = jobService.taskUpdate(taskUpdateRequest);
                                log.info("taskUpdate: " + JSONAccessor.impl().format((Object)taskUpdateResponse));
                            }
                            List contentList = taskContext.getContentList();
                            TaskLogPutRequest taskLogPutRequest = new TaskLogPutRequest();
                            taskLogPutRequest.setJobId(task.getJobId());
                            taskLogPutRequest.setTaskId(task.getId());
                            taskLogPutRequest.setType(jobType);
                            taskLogPutRequest.setContentList(contentList);
                            TaskLogPutResponse taskLogPutResponse = jobService.taskLog(taskLogPutRequest);
                            log.info("taskLog: " + JSONAccessor.impl().format((Object)taskLogPutResponse));
                        }
                    }
                    catch (Exception e) {
                        log.error("Error processing record from topic: {}", (Object)topic, (Object)e);
                    }
                    finally {
                        MDC.remove((String)"traceId");
                        MDC.remove((String)"requestId");
                    }
                });
                ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(TaskListenerRunnerViaKafkaProvider.this.consumerFactory, containerProps);
                container.setConcurrency(Math.max(taskListenerRunnerProviderConfig.getConcurrency(), 1));
                container.setApplicationEventPublisher(event -> {});
                container.start();
                TaskListenerRunnerViaKafkaProvider.this.containers.put(containerId, (ConcurrentMessageListenerContainer<String, String>)container);
                log.info("Started dynamic consumer for topic: {}, group: {}", (Object)topic, (Object)groupId);
                return container;
            }
        });
        return container != null;
    }

    public void close() {
        for (ConcurrentMessageListenerContainer<String, String> container : this.containers.values()) {
            try {
                container.stop();
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    public void run(String ... args) throws Exception {
        ShutdownManager.getOrCreate().register(TaskListenerRunnerViaKafkaProvider.class.getName(), new Util.CatchRunnable(){

            public void run() throws Exception {
                TaskListenerRunnerViaKafkaProvider.this.close();
            }
        });
        log.info("register shutdown hook, jobhub kafka listener");
    }
}

