/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.config.redis;

import com.ovopark.config.MdcThreadPoolTaskExecutor;
import com.ovopark.config.redis.DistributedDelayedQueueListener;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RLock;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.CollectionUtils;
import reactor.util.annotation.NonNull;

public class DistributedDelayedQueueInit
implements ApplicationListener<ApplicationStartedEvent> {
    private static final Logger log = LoggerFactory.getLogger(DistributedDelayedQueueInit.class);
    @Value(value="${node}")
    private Integer node;
    private final List<DistributedDelayedQueueListener<?>> distributedDelayedQueueListenerList;
    private final RedissonClient redissonClient;
    private final RedisTemplate redisTemplate;
    public MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor(5, 40, 60, Integer.MAX_VALUE, "ins");
    public static final String LOCK_PREFIX = "DISTRIBUTED_DELAYED_QUEUE_INIT_LOCK";
    public static final String FLAG_PREFIX = "DISTRIBUTED_DELAYED_QUEUE_INIT_FLAG";
    private volatile boolean running = true;

    public DistributedDelayedQueueInit(List<DistributedDelayedQueueListener<?>> distributedDelayedQueueListenerList, RedissonClient redissonClient, RedisTemplate redisTemplate) {
        this.distributedDelayedQueueListenerList = distributedDelayedQueueListenerList;
        this.redissonClient = redissonClient;
        this.redisTemplate = redisTemplate;
    }

    @PreDestroy
    public void destroy() {
        log.info("\u5f00\u59cb\u9500\u6bc1\u5ef6\u65f6\u961f\u5217\u4efb\u52a1\u76d1\u542c\u7ebf\u7a0b...");
        this.executor.shutdown();
        log.info("\u7ed3\u675f\u9500\u6bc1\u5ef6\u65f6\u961f\u5217\u4efb\u52a1\u76d1\u542c\u7ebf\u7a0b...");
    }

    private <T> void startThread(String queueName, DistributedDelayedQueueListener<T> listener) {
        this.executor.execute(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                RBlockingQueue blockingFairQueue = this.redissonClient.getBlockingQueue(queueName);
                this.redissonClient.getDelayedQueue((RQueue)blockingFairQueue);
                if (!this.running) continue;
                try {
                    Object t = blockingFairQueue.take();
                    log.info("listener queue {},get :{}", (Object)queueName, t);
                    this.executor.execute(() -> {
                        try {
                            listener.invoke(t);
                        }
                        catch (Exception e) {
                            log.error("delay queue {}push plan error\uff1a", (Object)queueName, (Object)e);
                        }
                    });
                }
                catch (Exception e) {
                    e.printStackTrace();
                    log.error("delay queue {}push plan error\uff1a", (Object)queueName, (Object)e);
                    Thread.currentThread().interrupt();
                }
            }
        });
        log.info("=====>\u542f\u52a8\u76d1\u542c\u4efb\u52a1{}\u6210\u529f...", (Object)queueName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onApplicationEvent(@NonNull ApplicationStartedEvent applicationStartedEvent) {
        if (CollectionUtils.isEmpty(this.distributedDelayedQueueListenerList)) {
            return;
        }
        log.info("===> \u5f00\u59cb\u521d\u59cb\u5316\u5206\u5e03\u5f0f\u5ef6\u65f6\u961f\u5217 ...");
        for (DistributedDelayedQueueListener<?> listener : this.distributedDelayedQueueListenerList) {
            String className = listener.getClass().getName();
            this.startThread(className, listener);
            RLock lock = this.redissonClient.getLock("DISTRIBUTED_DELAYED_QUEUE_INIT_LOCK:" + className);
            if (!lock.tryLock()) continue;
            try {
                String listenerFlag = "DISTRIBUTED_DELAYED_QUEUE_INIT_FLAG:" + className;
                if (!Boolean.TRUE.equals(this.redisTemplate.opsForValue().setIfAbsent((Object)listenerFlag, (Object)"\u5df2\u521d\u59cb\u5316", 5L, TimeUnit.MINUTES))) continue;
                listener.init();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                lock.unlock();
            }
        }
        log.info("===> \u5ef6\u65f6\u961f\u5217\u521d\u59cb\u5316\u5b8c\u6210 ...");
    }

    public void stopAll() {
        this.running = false;
    }
}

