/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.device.platform.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.slf4j.Logger;

public class BatchConcurrentProcessor {
    public static final int DEFAULT_BATCH_SIZE = 5000;
    public static final int DEFAULT_TIMEOUT_SECONDS = 30;
    private static final int MAX_CONCURRENCY_50 = 50;
    private static final int MAX_CONCURRENCY_100 = 100;
    private static final int CONCURRENCY_MEDIUM = 500;
    private static final int CONCURRENCY_HIGH = 1000;

    private BatchConcurrentProcessor() {
    }

    public static <K, V> Map<K, V> processBatchConcurrentlyGeneric(List<K> inputList, String methodName, int batchSize, int timeoutSecond, Logger logger, Function<List<K>, Map<K, V>> batchProcessor) {
        if (inputList == null || inputList.isEmpty()) {
            return new HashMap();
        }
        int totalSize = inputList.size();
        if (totalSize <= batchSize) {
            try {
                HashMap result = batchProcessor.apply(inputList);
                return result != null ? result : new HashMap();
            }
            catch (Exception e) {
                logger.error("{}: exception occurred while processing list directly", (Object)methodName, (Object)e);
                return new HashMap();
            }
        }
        List<List<K>> batches = BatchConcurrentProcessor.partitionList(inputList, batchSize);
        int maxConcurrency = BatchConcurrentProcessor.calculateMaxConcurrency(batches.size());
        Semaphore semaphore = new Semaphore(maxConcurrency);
        logger.info("{}: totalSize={}, batchSize={}, batchCount={}, maxConcurrency={}, timeoutSeconds={}", new Object[]{methodName, totalSize, batchSize, batches.size(), maxConcurrency, timeoutSecond});
        long startTime = System.currentTimeMillis();
        ConcurrentHashMap result = new ConcurrentHashMap();
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();){
            List<CompletableFuture<Map<K, V>>> futures = BatchConcurrentProcessor.createFutures(batches, batchProcessor, semaphore, executor, logger, methodName);
            boolean timeoutOccurred = BatchConcurrentProcessor.waitForCompletion(futures, methodName, timeoutSecond, logger);
            BatchConcurrentProcessor.mergeResults(futures, result, timeoutOccurred, startTime, methodName, timeoutSecond, logger);
        }
        return result;
    }

    private static <T> List<List<T>> partitionList(List<T> list, int partitionSize) {
        if (list == null || list.isEmpty() || partitionSize <= 0) {
            return new ArrayList<List<T>>();
        }
        ArrayList<List<T>> partitions = new ArrayList<List<T>>();
        for (int i = 0; i < list.size(); i += partitionSize) {
            partitions.add(new ArrayList<T>(list.subList(i, Math.min(i + partitionSize, list.size()))));
        }
        return partitions;
    }

    private static int calculateMaxConcurrency(int batchCount) {
        if (batchCount <= 50) {
            return batchCount;
        }
        if (batchCount <= 100) {
            return 500;
        }
        return 1000;
    }

    private static <K, V> List<CompletableFuture<Map<K, V>>> createFutures(List<List<K>> batches, Function<List<K>, Map<K, V>> batchProcessor, Semaphore semaphore, ExecutorService executor, Logger logger, String methodName) {
        ArrayList<CompletableFuture<Map<K, V>>> futures = new ArrayList<CompletableFuture<Map<K, V>>>();
        for (int i = 0; i < batches.size(); ++i) {
            int batchIndex = i;
            List batch = batches.get(i);
            CompletableFuture<Map> future = CompletableFuture.supplyAsync(() -> {
                boolean acquired = false;
                try {
                    semaphore.acquire();
                    acquired = true;
                    if (Thread.currentThread().isInterrupted()) {
                        HashMap hashMap = new HashMap();
                        return hashMap;
                    }
                    Map map = (Map)batchProcessor.apply(batch);
                    return map;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    logger.warn("{}: Batch {}/{} processing interrupted", new Object[]{methodName, batchIndex + 1, batches.size()});
                    HashMap hashMap = new HashMap();
                    return hashMap;
                }
                catch (Exception e) {
                    logger.error("{}: Exception occurred while processing batch {}/{}: {}", new Object[]{methodName, batchIndex + 1, batches.size(), e.getMessage(), e});
                    HashMap hashMap = new HashMap();
                    return hashMap;
                }
                finally {
                    if (acquired) {
                        semaphore.release();
                    }
                }
            }, executor);
            futures.add(future);
        }
        return futures;
    }

    private static <K, V> boolean waitForCompletion(List<CompletableFuture<Map<K, V>>> futures, String methodName, int timeoutSecond, Logger logger) {
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        try {
            allFutures.get(timeoutSecond, TimeUnit.SECONDS);
            return false;
        }
        catch (TimeoutException e) {
            long completedCount = futures.stream().mapToLong(f -> f.isDone() ? 1L : 0L).sum();
            logger.warn("{}: timed out while waiting for all batches to complete ({} seconds), completed batches: {}/{}", new Object[]{methodName, timeoutSecond, completedCount, futures.size()});
            BatchConcurrentProcessor.cancelPendingFutures(futures);
            return true;
        }
        catch (InterruptedException e) {
            logger.error("{}: interrupted while waiting for all batches to complete", (Object)methodName, (Object)e);
            Thread.currentThread().interrupt();
            BatchConcurrentProcessor.cancelPendingFutures(futures);
            return false;
        }
        catch (ExecutionException e) {
            logger.error("{}: execution exception while waiting for all batches to complete", (Object)methodName, (Object)e);
            return false;
        }
    }

    private static <K, V> void cancelPendingFutures(List<CompletableFuture<Map<K, V>>> futures) {
        for (CompletableFuture<Map<K, V>> future : futures) {
            if (future.isDone() || future.isCancelled()) continue;
            future.cancel(true);
        }
    }

    private static <K, V> void mergeResults(List<CompletableFuture<Map<K, V>>> futures, Map<K, V> result, boolean timeoutOccurred, long startTime, String methodName, int timeoutSecond, Logger logger) {
        long remainingTime;
        int successCount = 0;
        int errorCount = 0;
        int pendingCount = 0;
        for (CompletableFuture<Map<K, V>> future : futures) {
            if (future.isDone() && !future.isCancelled()) {
                try {
                    Map batchResult = future.getNow(null);
                    if (batchResult != null && !batchResult.isEmpty()) {
                        result.putAll(batchResult);
                        ++successCount;
                        continue;
                    }
                    ++errorCount;
                }
                catch (Exception e) {
                    ++errorCount;
                }
                continue;
            }
            ++pendingCount;
        }
        if (pendingCount > 0 && !timeoutOccurred && (remainingTime = (long)timeoutSecond * 1000L - (System.currentTimeMillis() - startTime)) > 0L) {
            long quickWaitTime = Math.min(1000L, remainingTime);
            for (CompletableFuture<Map<K, V>> future : futures) {
                if (future.isDone() || future.isCancelled()) continue;
                try {
                    Map<K, V> batchResult = future.get(quickWaitTime, TimeUnit.MILLISECONDS);
                    if (batchResult == null || batchResult.isEmpty()) continue;
                    result.putAll(batchResult);
                    ++successCount;
                    --pendingCount;
                }
                catch (Exception exception) {}
            }
        }
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (pendingCount > 0) {
            logger.warn("{}: processing completed partially, totalResultSize={}, successBatches={}, errorBatches={}, pendingBatches={}, elapsedTime={}ms", new Object[]{methodName, result.size(), successCount, errorCount, pendingCount, elapsedTime});
        } else {
            logger.info("{}: processing completed, totalResultSize={}, successBatches={}, errorBatches={}, elapsedTime={}ms", new Object[]{methodName, result.size(), successCount, errorCount, elapsedTime});
        }
    }
}

