/*
 * Decompiled with CFR 0.152.
 */
package com.volcengine.service.tls;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.volcengine.model.tls.ClientBuilder;
import com.volcengine.model.tls.ClientConfig;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.producer.BatchLog;
import com.volcengine.model.tls.producer.CallBack;
import com.volcengine.model.tls.producer.ProducerConfig;
import com.volcengine.service.tls.RetryManager;
import com.volcengine.service.tls.TLSLogClient;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LogDispatcher {
    public static final String TLS_THREAD_POOL_FORMAT = "dispatcher-thread-%d";
    private final ProducerConfig producerConfig;
    private final ExecutorService executorService;
    private TLSLogClient client;
    private final String producerName;
    private final BlockingQueue<BatchLog> successQueue;
    private final BlockingQueue<BatchLog> failureQueue;
    private static final Log LOG = LogFactory.getLog(LogDispatcher.class);
    private final AtomicInteger addLogLock = new AtomicInteger(0);
    private volatile boolean closed;
    private final Semaphore memoryLock;
    private final AtomicInteger batchCount;
    private final RetryManager retryManager;
    private final ConcurrentHashMap<BatchLog.BatchKey, BatchLog.BatchManager> batches;

    public LogDispatcher(ProducerConfig producerConfig, String producerName, BlockingQueue<BatchLog> successQueue, BlockingQueue<BatchLog> failureQueue, Semaphore memoryLock, AtomicInteger batchCount, RetryManager retryManager) throws LogException {
        this.producerConfig = producerConfig;
        this.producerName = producerName;
        this.executorService = Executors.newFixedThreadPool(producerConfig.getMaxThreadCount(), new ThreadFactoryBuilder().setNameFormat(producerName + "-" + TLS_THREAD_POOL_FORMAT).setDaemon(true).build());
        this.memoryLock = memoryLock;
        this.successQueue = successQueue;
        this.failureQueue = failureQueue;
        this.batches = new ConcurrentHashMap();
        this.batchCount = batchCount;
        this.retryManager = retryManager;
        this.client = ClientBuilder.newClient(producerConfig.getClientConfig());
    }

    public TLSLogClient getClient() {
        return this.client;
    }

    public ConcurrentHashMap<BatchLog.BatchKey, BatchLog.BatchManager> getBatches() {
        return this.batches;
    }

    public void start() {
        this.closed = false;
        LOG.info((Object)String.format("log dispatcher %s started and client init success", this.producerName));
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public void close() {
        this.closed = true;
    }

    public void closeNow() {
        this.closed = true;
        this.executorService.shutdownNow();
    }

    private BatchLog.BatchManager getOrCreateBatchManager(BatchLog.BatchKey batchKey) {
        BatchLog.BatchManager batchManager = this.batches.get(batchKey);
        if (batchManager != null) {
            return batchManager;
        }
        batchManager = new BatchLog.BatchManager();
        BatchLog.BatchManager original = this.batches.putIfAbsent(batchKey, batchManager);
        return original != null ? original : batchManager;
    }

    public void resetAccessKeyToken(String accessKey, String secretKey, String securityToken) {
        ClientConfig clientConfig = this.producerConfig.getClientConfig();
        clientConfig.resetAccessKeyToken(accessKey, secretKey, securityToken);
        this.client.resetAccessKeyToken(accessKey, secretKey, securityToken);
        LOG.info((Object)String.format("log dispatcher %s update client config %s success", this.producerName, clientConfig));
    }

    public void addBatch(String hashKey, String topicId, String source, String filename, PutLogRequest.LogGroup logGroup, CallBack callBack) throws InterruptedException, LogException {
        this.doAdd(hashKey, topicId, source, filename, logGroup, callBack);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doAdd(String hashKey, String topicId, String source, String filename, PutLogRequest.LogGroup logGroup, CallBack callBack) throws LogException, InterruptedException {
        if (this.closed) {
            throw new LogException("Producer Error", "closed LogDispatcher cannot receive logs anymore", null);
        }
        int batchSize = this.calculateSize(logGroup);
        this.producerConfig.checkBatchSize(batchSize);
        long maxBlockMs = this.producerConfig.getMaxBlockMs();
        LOG.debug((Object)String.format("dispatcher %s try acquire memory lock ", this.producerName));
        if (maxBlockMs == 0L) {
            this.memoryLock.acquire();
        } else {
            boolean acquired = this.memoryLock.tryAcquire(batchSize, maxBlockMs, TimeUnit.MILLISECONDS);
            if (!acquired) {
                LOG.warn((Object)String.format("Failed to acquire memory within the configured max blocking time %d ms, requiredSizeInBytes=%d, availableSizeInBytes=%d", this.producerConfig.getMaxBlockMs(), batchSize, this.memoryLock.availablePermits()));
                throw new LogException("Producer Error", String.format("dispatcher %s try acquire memory lock failed", this.producerName), null);
            }
        }
        try {
            BatchLog.BatchManager batchManager;
            BatchLog.BatchKey batchKey = new BatchLog.BatchKey(hashKey, topicId, source, filename);
            BatchLog.BatchManager batchManager2 = batchManager = this.getOrCreateBatchManager(batchKey);
            synchronized (batchManager2) {
                this.addToBatchManager(batchKey, logGroup, callBack, batchSize, batchManager);
            }
        }
        catch (Exception e) {
            this.memoryLock.release(batchSize);
            throw new LogException("Producer Error", "dispatcher add batch concurrent error", null);
        }
    }

    private int calculateSize(PutLogRequest.LogGroup logGroup) {
        if (logGroup == null) {
            return 0;
        }
        return logGroup.getSerializedSize();
    }

    private void addToBatchManager(BatchLog.BatchKey batchKey, PutLogRequest.LogGroup logGroup, CallBack callBack, int batchSize, BatchLog.BatchManager batchManager) throws LogException {
        boolean success;
        BatchLog batchLog = batchManager.getBatchLog();
        if (batchLog != null) {
            success = batchLog.tryAdd(logGroup, batchSize, callBack);
            if (success) {
                if (batchManager.fullAndSendBatchRequest()) {
                    batchManager.addNow(this.producerConfig, this.executorService, this.client, this.successQueue, this.failureQueue, this.batchCount, this.retryManager);
                }
                return;
            }
            batchManager.addNow(this.producerConfig, this.executorService, this.client, this.successQueue, this.failureQueue, this.batchCount, this.retryManager);
        }
        batchLog = new BatchLog(batchKey, this.producerConfig);
        batchManager.setBatchLog(batchLog);
        success = batchLog.tryAdd(logGroup, batchSize, callBack);
        if (!success) {
            LOG.error((Object)String.format("tryAdd batchLog failed, batchKey = %s, batchSize = %d, batchCount = %d", batchKey.toString(), batchSize, logGroup.getLogsCount()));
            throw new LogException("Producer Error", "tryAdd batchLog failed", null);
        }
        if (batchManager.fullAndSendBatchRequest()) {
            batchManager.addNow(this.producerConfig, this.executorService, this.client, this.successQueue, this.failureQueue, this.batchCount, this.retryManager);
        }
    }
}

