package com.ovopark.log.collect.appender;

import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.LinkedListMultimap;
import com.ovopark.log.collect.consts.LogConst;
import com.ovopark.log.collect.exception.LogException;
import com.ovopark.log.collect.kafka.KafkaProducerClient;
import com.ovopark.log.collect.model.KafkaHost;
import com.ovopark.log.collect.redis.KafkaHostsListener;
import com.ovopark.log.collect.redis.RedisTemplate;
import com.ovopark.log.collect.util.Pair;
import com.ovopark.log.collect.util.StrUtil;
import com.ovopark.log.collect.util.ThreadPoolUtil;
import java.io.IOException;
import java.net.InetAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/ovopark/log/collect/appender/LogMessageAppender.class */
public class LogMessageAppender {
    private static LogBlockingQueue logBlockingQueue;
    private static volatile KafkaProducerClient kafkaInstance;
    private static final AtomicLong LAST_RUN_PUSH_TIME = new AtomicLong(0);
    private static int queueSize = 10000;
    private static final Cache<Class<?>, Boolean> CACHE = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();

    public static void startCollectingLog(int i) {
        while (true) {
            try {
                doStartLog(i, kafkaInstance);
            } catch (InterruptedException e) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    public static void initQueue(int i) {
        queueSize = i;
        if (logBlockingQueue == null) {
            logBlockingQueue = new LogBlockingQueue(i);
        }
    }

    public static void pushRunDataQueue(Pair<String, String> pair) {
        if (pair == null || logBlockingQueue.size() >= queueSize) {
            return;
        }
        logBlockingQueue.add(pair);
    }

    private static void doStartLog(int i, KafkaProducerClient kafkaProducerClient) throws InterruptedException {
        int size = logBlockingQueue.size();
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - LAST_RUN_PUSH_TIME.get();
        if (size >= i || j > 500) {
            push(logBlockingQueue.drainTo(i), kafkaProducerClient);
            LAST_RUN_PUSH_TIME.set(currentTimeMillis);
        } else if (size != 0) {
            Thread.sleep(100L);
        } else {
            push(logBlockingQueue.take(), kafkaProducerClient);
            LAST_RUN_PUSH_TIME.set(currentTimeMillis);
        }
    }

    public static void push(Pair<String, String> pair, KafkaProducerClient kafkaProducerClient) {
        Boolean bool = (Boolean) CACHE.getIfPresent(LogMessageAppender.class);
        if (bool == null || bool.booleanValue()) {
            try {
                kafkaProducerClient.putMessage(pair.getFirst(), pair.getSecond());
                CACHE.put(LogMessageAppender.class, true);
            } catch (LogException e) {
                CACHE.put(LogMessageAppender.class, false);
            }
        }
    }

    public static void push(LinkedListMultimap<String, String> linkedListMultimap, KafkaProducerClient kafkaProducerClient) {
        Boolean bool = (Boolean) CACHE.getIfPresent(LogMessageAppender.class);
        if (bool == null || bool.booleanValue()) {
            try {
                for (String str : linkedListMultimap.keySet()) {
                    kafkaProducerClient.putMessageList(str, linkedListMultimap.get(str));
                }
                CACHE.put(LogMessageAppender.class, true);
            } catch (LogException e) {
                CACHE.put(LogMessageAppender.class, false);
            }
        }
    }

    public static void lazyInitKafkaClient(String str, String... strArr) {
        if (kafkaInstance == null) {
            synchronized (LogMessageAppender.class) {
                if (kafkaInstance == null) {
                    initKafkaClient(str, strArr);
                }
                if (kafkaInstance == null) {
                    initJsonKafkaClient(null, strArr);
                }
            }
        }
    }

    public static void initKafkaClient(String str, String... strArr) {
        if (StrUtil.isBlank((CharSequence) str)) {
            return;
        }
        kafkaInstance = KafkaProducerClient.getClient(str, strArr);
    }

    public static void initJsonKafkaClient(String str, String... strArr) {
        if (StrUtil.isBlank((CharSequence) str)) {
            str = RedisTemplate.getInstance().get(LogConst.REDIS_KAFKA_HOST_KEY);
            RedisTemplate.getInstance().subscribe(new KafkaHostsListener(), LogConst.REDIS_KAFKA_HOST_PUBLISH_CHANNEL);
        }
        if (StrUtil.isBlank((CharSequence) str)) {
            throw new LogException("从Redis读取kafka配置出错, 请检查Redis或者本地配置");
        }
        try {
            List<KafkaHost> parseArray = JSON.parseArray(str, KafkaHost.class);
            CountDownLatch countDownLatch = new CountDownLatch(parseArray.size());
            LinkedList linkedList = new LinkedList();
            for (KafkaHost kafkaHost : parseArray) {
                ThreadPoolUtil.THREAD_POOL_CACHE.execute(() -> {
                    String innerIpAddress = kafkaHost.getInnerIpAddress();
                    try {
                        boolean isReachable = InetAddress.getByName(innerIpAddress).isReachable(2000);
                        System.out.println(isReachable ? "[dc-log] Hook up with " + innerIpAddress + " successfully!" : "[dc-log] Sadly dumped by " + innerIpAddress);
                        linkedList.add(kafkaHost.getValidKafkaHost(isReachable));
                        countDownLatch.countDown();
                    } catch (IOException e) {
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                });
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
            }
            kafkaInstance = KafkaProducerClient.getClient(String.join(",", linkedList), strArr);
        } catch (Exception e2) {
            throw new LogException("Redis读取的kafka格式有误, 请联系相关开发检查");
        }
    }
}
