/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.dc.processor.sender;

import com.ovopark.dc.processor.accumulator.RecordAccumulator;
import com.ovopark.dc.processor.client.StorageOssClient;
import com.ovopark.dc.processor.client.StorageOssSyncClient;
import com.ovopark.dc.processor.client.callback.ResponseHandler;
import com.ovopark.dc.processor.entity.FileRecordBuilder;
import com.ovopark.dc.processor.entity.ProducerBatch;
import com.ovopark.dc.processor.entity.StorageTypeConnectionState;
import com.ovopark.dc.processor.entity.request.AliyunUploadRequest;
import com.ovopark.dc.processor.entity.request.AliyunUploadResponse;
import com.ovopark.dc.processor.entity.request.ClientRequest;
import com.ovopark.dc.processor.entity.request.ClientResponse;
import com.ovopark.dc.storage.broker.client.upload.AbstractUploadClient;
import com.ovopark.dc.storage.broker.client.upload.AliyunUploadClient;
import com.ovopark.dc.storage.broker.model.dto.StorageBucketDTO;
import com.ovopark.dc.storage.broker.model.enums.ErrorEnum;
import com.ovopark.dc.storage.broker.model.enums.StorageTypeEnum;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Sender
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Sender.class);
    private volatile boolean running = true;
    private final StorageOssClient client;
    private final int maxSendSize;
    private RecordAccumulator recordAccumulator;
    private Map<StorageBucketDTO, List<ProducerBatch>> inFlightBatches;
    private AliyunUploadClient aliyunUploadClient;

    public Sender(RecordAccumulator recordAccumulator, StorageTypeConnectionState connectionStates, AliyunUploadClient aliyunUploadClient) {
        this.client = new StorageOssSyncClient(connectionStates);
        this.maxSendSize = 0x100000;
        this.recordAccumulator = recordAccumulator;
        this.aliyunUploadClient = aliyunUploadClient;
        this.inFlightBatches = new HashMap<StorageBucketDTO, List<ProducerBatch>>();
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                this.doRunOnce();
            }
            catch (Exception e) {
                log.error("\u53d1\u9001\u8fc7\u7a0b\u4e2d\u53d1\u751f\u5f02\u5e38\uff1a", (Throwable)e);
            }
        }
    }

    private void doRunOnce() {
        long pollTimeout = this.sendProducerData();
        this.client.poll(pollTimeout);
    }

    private long sendProducerData() {
        RecordAccumulator.StorageTypeReadyResult readyResult = this.recordAccumulator.ready();
        if (!readyResult.unknownStorage.isEmpty()) {
            log.error("\u5b58\u5728\u6709\u672a\u77e5\u7684\u4e3b\u9898\u6570\u636e\uff1a{}", readyResult.unknownStorage);
        }
        Iterator<Integer> iterator = readyResult.readyStorage.iterator();
        while (iterator.hasNext()) {
            int storage = iterator.next();
            if (this.client.ready(storage)) continue;
            iterator.remove();
            log.error("{}\uff1a\u65e0\u6cd5\u8fde\u63a5\u4e0a", (Object)StorageTypeEnum.getValue((int)storage));
        }
        long pollTimeout = 3000L;
        if (!readyResult.readyStorage.isEmpty()) {
            pollTimeout = 0L;
        }
        Map<Integer, List<ProducerBatch>> batches = this.recordAccumulator.drain(readyResult.readyStorage, this.maxSendSize);
        this.addInFlightBatches(batches);
        this.sendProduceRequests(batches);
        return pollTimeout;
    }

    private void addInFlightBatches(Map<Integer, List<ProducerBatch>> batches) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) {
            this.addInFlightBatch(entry.getValue());
        }
    }

    private void addInFlightBatch(List<ProducerBatch> value) {
        for (ProducerBatch producerBatch : value) {
            List producerBatchList = this.inFlightBatches.computeIfAbsent(producerBatch.storageTypeBusiness(), k -> new ArrayList());
            producerBatchList.add(producerBatch);
        }
    }

    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> batches) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) {
            this.sendProduceRequest(entry.getKey(), entry.getValue());
        }
    }

    private void sendProduceRequest(Integer storageType, List<ProducerBatch> batches) {
        if (batches.isEmpty()) {
            return;
        }
        HashMap<StorageBucketDTO, FileRecordBuilder> requestRecord = new HashMap<StorageBucketDTO, FileRecordBuilder>();
        HashMap<StorageBucketDTO, ProducerBatch> callbackRecord = new HashMap<StorageBucketDTO, ProducerBatch>();
        for (ProducerBatch producerBatch : batches) {
            StorageBucketDTO storageTypeBusiness = producerBatch.storageTypeBusiness();
            requestRecord.put(storageTypeBusiness, producerBatch.fileRecordBuilder());
            callbackRecord.put(storageTypeBusiness, producerBatch);
        }
        AliyunUploadRequest.Builder builder = new AliyunUploadRequest.Builder(requestRecord, (AbstractUploadClient)this.aliyunUploadClient);
        ResponseHandler responseHandler = response -> this.handlerResponse(response, callbackRecord);
        ClientRequest clientRequest = this.client.newClientRequest(storageType, builder, responseHandler);
        this.client.send(clientRequest);
    }

    private void handlerResponse(ClientResponse response, Map<StorageBucketDTO, ProducerBatch> callbackRecord) {
        if (response.isDisconnected()) {
            for (Map.Entry<StorageBucketDTO, ProducerBatch> entry : callbackRecord.entrySet()) {
                this.completeBatch(entry.getValue(), new AliyunUploadResponse.Response(ErrorEnum.DISCONNECTED));
            }
        } else if (response.sending()) {
            AliyunUploadResponse responseBody = (AliyunUploadResponse)response.getResponseBody();
            for (Map.Entry<StorageBucketDTO, ProducerBatch> entry : callbackRecord.entrySet()) {
                this.processBatch(entry.getValue(), new AliyunUploadResponse.Response(ErrorEnum.NONE, responseBody.responses().get(entry.getKey()).getResult()));
            }
        } else if (response.getResponseBody() != null) {
            AliyunUploadResponse responseBody = (AliyunUploadResponse)response.getResponseBody();
            for (Map.Entry<StorageBucketDTO, AliyunUploadResponse.Response> responses : responseBody.responses().entrySet()) {
                StorageBucketDTO storageUploadDTO = responses.getKey();
                ProducerBatch producerBatch = callbackRecord.get(storageUploadDTO);
                this.completeBatch(producerBatch, responses.getValue());
            }
        } else {
            for (Map.Entry<StorageBucketDTO, ProducerBatch> entry : callbackRecord.entrySet()) {
                this.completeBatch(entry.getValue(), new AliyunUploadResponse.Response(ErrorEnum.NONE));
            }
        }
    }

    private void processBatch(ProducerBatch producerBatch, AliyunUploadResponse.Response response) {
        producerBatch.uploadRate(response.getResult());
    }

    private void completeBatch(ProducerBatch producerBatch, AliyunUploadResponse.Response response) {
        this.doCompleteBatch(producerBatch, response);
    }

    private void doCompleteBatch(ProducerBatch producerBatch, AliyunUploadResponse.Response response) {
        if (producerBatch.done(response.getResult(), response.getError().getException())) {
            this.removeBatchOrAccumulator(producerBatch);
        }
    }

    private void removeBatchOrAccumulator(ProducerBatch producerBatch) {
        this.removeBatch(producerBatch);
        this.recordAccumulator.remove(producerBatch);
    }

    private void removeBatch(ProducerBatch producerBatch) {
        List<ProducerBatch> batches = this.inFlightBatches.get(producerBatch.storageTypeBusiness());
        if (batches != null) {
            batches.remove(producerBatch);
            if (batches.isEmpty()) {
                this.inFlightBatches.remove(producerBatch.storageTypeBusiness());
            }
        }
    }

    public void flush() {
        this.client.flush();
    }
}

