/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.kernel.shared.stream;

import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.kv.CacheService;
import com.ovopark.kernel.shared.stream.Publisher;
import com.ovopark.kernel.shared.stream.Stream;
import com.ovopark.kernel.shared.stream.Subscriber;
import com.ovopark.kernel.shared.stream.Subscription;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.zip.GZIPInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TextStream<T>
extends Stream.StreamExt<T, TextStream<T>> {
    public static final NewLinePredicate ANY = l -> true;

    private TextStream(Stream stream) {
        super(stream);
    }

    public static <T> TextStream<T> from(FileEntrySupplier<Part> fileEntrySupplier, FileEntrySupplier.FileSyncConfFsync syncConfFsync) {
        FileEntryPublisher iteratorPublisher = new FileEntryPublisher(fileEntrySupplier, syncConfFsync);
        Stream<FileEntryPublisher> stringStream = Stream.from(iteratorPublisher);
        if (fileEntrySupplier instanceof Stream.DoFinally) {
            stringStream = stringStream.doFinally(type -> ((Stream.DoFinally)((Object)fileEntrySupplier)).runFinally(type), false);
        }
        TextStream<T> textStream = new TextStream<T>(stringStream);
        return textStream;
    }

    public static <T> TextStream<T> from(String filePath, ContinueGet continueGet, FileEntrySupplier.FileSyncConfFsync syncConfFsync) {
        return TextStream.from(filePath, continueGet, syncConfFsync, new FileEntrySupplier.FileSyncConfFsyncListener(){});
    }

    public static <T> TextStream<T> from(String filePath, ContinueGet continueGet, FileEntrySupplier.FileSyncConfFsync syncConfFsync, FileEntrySupplier.FileSyncConfFsyncListener fileSyncConfFsyncListener) {
        try {
            return TextStream.from(new FileEntrySupplierImpl(filePath, continueGet, fileSyncConfFsyncListener), syncConfFsync);
        }
        catch (IOException e) {
            throw Util.convert2RuntimeException(e);
        }
    }

    public static <T> TextStream<T> fromGZIP(String filePath, FileEntrySupplier.FileSyncConfFsync syncConfFsync) {
        return TextStream.fromGZIP(filePath, syncConfFsync, new FileEntrySupplier.FileSyncConfFsyncListener(){});
    }

    public static <T> TextStream<T> fromGZIP(String filePath, FileEntrySupplier.FileSyncConfFsync syncConfFsync, FileEntrySupplier.FileSyncConfFsyncListener fileSyncConfFsyncListener) {
        try {
            return TextStream.from(new GZFileEntrySupplierImpl(filePath, fileSyncConfFsyncListener), syncConfFsync);
        }
        catch (IOException e) {
            throw Util.convert2RuntimeException(e);
        }
    }

    public final TextStream<Line> mergeLine(NewLinePredicate newLinePredicate, int maxLineCount) {
        return (TextStream)this.linkStream(new MergeOperator(this.getStream(), newLinePredicate, maxLineCount));
    }

    private static byte[] lines(List<Part> lineList, byte[] bytes, int position, long blockPosition) {
        int newLineStartInclude = 0;
        boolean hasChar = false;
        int preSize = lineList.size();
        int lastN_Position = 0;
        for (int i = 0; i < bytes.length && i < position; ++i) {
            byte b = bytes[i];
            if (!hasChar && b != 13 && b != 10) {
                hasChar = true;
                newLineStartInclude = i;
            }
            if (!hasChar || b != 10 && b != 13) continue;
            lastN_Position = i;
            hasChar = false;
            int lineLength = i - newLineStartInclude;
            if (lineLength == 0) continue;
            byte[] lineBytes = new byte[lineLength];
            System.arraycopy(bytes, newLineStartInclude, lineBytes, 0, lineLength);
            String line = new String(lineBytes, StandardCharsets.UTF_8);
            Part part = new Part(line, blockPosition);
            lineList.add(part);
        }
        int size = lineList.size();
        if (size - preSize == 0 && bytes.length > 0) {
            return bytes;
        }
        if (lastN_Position < position) {
            int restLength = position - lastN_Position - 1;
            byte[] lineBytes = new byte[restLength];
            System.arraycopy(bytes, lastN_Position + 1, lineBytes, 0, restLength);
            return lineBytes;
        }
        return new byte[0];
    }

    static final class FileEntryPublisher
    implements Publisher<Part> {
        private static final Logger log = LoggerFactory.getLogger(FileEntryPublisher.class);
        private final FileEntrySupplier<Part> entrySupplier;
        public final FileEntrySupplier.FileSyncConfFsync syncConfFsync;

        public FileEntryPublisher(FileEntrySupplier<Part> entrySupplier, FileEntrySupplier.FileSyncConfFsync syncConfFsync) {
            this.entrySupplier = entrySupplier;
            this.syncConfFsync = syncConfFsync;
        }

        @Override
        public void subscribe(final Subscriber<? super Part> subscriber) {
            final AtomicInteger index = new AtomicInteger(0);
            final AtomicReference listRef = new AtomicReference();
            final AtomicReference nextCompareRef = new AtomicReference();
            final AtomicLong fetchCount = new AtomicLong(0L);
            final AtomicLong sumCount = new AtomicLong(0L);
            final AtomicLong doneCount = new AtomicLong(0L);
            final long start = System.currentTimeMillis();
            Subscription subscription = new Subscription(){
                volatile boolean cancelled;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void request(long n) {
                    for (long i = 0L; i < n; ++i) {
                        List<Object> list;
                        if (this.cancelled) {
                            return;
                        }
                        if (listRef.get() == null) {
                            log.info("first, to fetch data from supplier: " + entrySupplier.getClass().getName());
                            list = entrySupplier.get(null, syncConfFsync);
                            fetchCount.incrementAndGet();
                            sumCount.getAndAdd(list == null ? 0L : (long)list.size());
                            listRef.set(list == null ? Collections.EMPTY_LIST : list);
                            if (list == null || list.isEmpty()) {
                                if (!entrySupplier.continueGet(((List)listRef.get()).size())) {
                                    log.info("first,no data,exit. cost: " + Util.costTime(start));
                                    break;
                                }
                                log.info("continue fetching data later.");
                                continue;
                            }
                        }
                        if (index.get() >= ((List)listRef.get()).size()) {
                            if (!entrySupplier.continueGet(((List)listRef.get()).size())) {
                                log.info(fetchCount.get() + ",(" + doneCount.get() + "/" + sumCount.get() + "),no data,exit. cost: " + Util.costTime(start));
                                break;
                            }
                            list = entrySupplier.get((Part)nextCompareRef.get(), syncConfFsync);
                            fetchCount.incrementAndGet();
                            sumCount.getAndAdd(list == null ? 0L : (long)list.size());
                            log.info(fetchCount.get() + ",(" + doneCount.get() + "/" + sumCount.get() + "),to fetch data from supplier: " + entrySupplier.getClass().getName());
                            listRef.set(list == null ? Collections.EMPTY_LIST : list);
                            if (list == null || list.isEmpty()) {
                                if (!entrySupplier.continueGet(((List)listRef.get()).size())) {
                                    log.info(fetchCount.get() + ",(" + doneCount.get() + "/" + sumCount.get() + "),no data,exit. cost: " + Util.costTime(start));
                                    break;
                                }
                                log.info("continue fetching data later.");
                                continue;
                            }
                            index.set(0);
                        }
                        int l = index.getAndIncrement();
                        Part t = (Part)((List)listRef.get()).get(l);
                        if (nextCompareRef.get() == null) {
                            nextCompareRef.set(t);
                        } else if (entrySupplier.compare((Part)nextCompareRef.get(), t) < 0) {
                            nextCompareRef.set(t);
                        }
                        if (this.cancelled) {
                            return;
                        }
                        subscriber.onNext(t);
                        doneCount.incrementAndGet();
                    }
                    if (this.cancelled) {
                        return;
                    }
                    Throwable t = null;
                    try {
                        subscriber.onComplete();
                    }
                    catch (Throwable throwable) {
                        t = throwable;
                    }
                    finally {
                        entrySupplier.onComplete((Part)nextCompareRef.get(), sumCount.get(), syncConfFsync, t);
                    }
                }

                @Override
                public void cancel() {
                    this.cancelled = true;
                }
            };
            subscriber.onSubscribe(subscription);
        }
    }

    public static interface FileEntrySupplier<T>
    extends Comparator<T> {
        public List<T> get(T var1, FileSyncConfFsync var2);

        public boolean continueGet(int var1);

        @Override
        default public int compare(T o1, T o2) {
            return Util.asc(o1, o2);
        }

        default public void onComplete(T nextCompare, long sum, FileSyncConfFsync fileSyncConfFsync, Throwable t) {
        }

        public static interface FileSyncConf {
            public String syncId();

            public String syncTime();

            public LocalDateTime taskTime();

            public String syncText();

            public Long syncCount();

            public String syncStatus();

            public long syncPosition();

            public String syncFile();

            public long lastModifiedTime();
        }

        public static class FileEntry
        extends Stream.MsgPerContextImpl {
            private final Stream.MsgPerContext.PerLogger perLogger = new Stream.MsgPerContext.PerLogger(){};
            private final String line;

            public FileEntry(String seq, String line) {
                this(seq, null, line);
            }

            @Deprecated
            public FileEntry(String seq, CacheService clusterCacheService, String line) {
                super(seq, clusterCacheService);
                this.line = line;
            }

            @Override
            public Stream.MsgPerContext.PerLogger logger() {
                return this.perLogger;
            }

            public String getLine() {
                return this.line;
            }
        }

        public static interface FileSyncConfFsyncListener {
            default public void before(String logFilePath, String syncStatus) {
            }

            default public void post(String logFilePath, String syncStatus) {
            }
        }

        public static interface FileSyncConfFsync<T extends FileSyncConf> {
            public void fsync(String var1, String var2, String var3, long var4, String var6, long var7, String var9);

            public T syncConf(String var1);

            default public String configPath(String syncId) {
                return null;
            }
        }
    }

    public static interface ContinueGet {
        public boolean continueGet(int var1, File var2, Event var3);
    }

    static class FileEntrySupplierImpl
    implements FileEntrySupplier<Part>,
    Stream.DoFinally {
        private static final Logger log = LoggerFactory.getLogger(FileEntrySupplierImpl.class);
        private final String filePath;
        private final File file;
        private final FileChannel fileChannel;
        private final ByteBuffer sharedByteBuffer = ByteBuffer.allocate(0x100000);
        private final int lineCount = 1000;
        private final ContinueGet continueGet;
        private volatile Event event;
        private long lineSumCount;
        private byte[] previousPart = new byte[0];
        private volatile int lastGetCount;
        private volatile boolean firstGet = true;
        private final FileEntrySupplier.FileSyncConfFsyncListener fileSyncConfFsyncListener;

        public FileEntrySupplierImpl(String filePath, ContinueGet continueGet, FileEntrySupplier.FileSyncConfFsyncListener fileSyncConfFsyncListener) throws IOException {
            this.filePath = filePath;
            this.file = new File(filePath);
            this.fileChannel = new FileInputStream(this.file).getChannel();
            this.continueGet = continueGet == null ? new EndAtEOF() : continueGet;
            this.fileSyncConfFsyncListener = fileSyncConfFsyncListener;
        }

        @Override
        public List<Part> get(Part nextCompare, FileEntrySupplier.FileSyncConfFsync fileSyncConfFsync) {
            ArrayList<Part> lineList;
            long blockPosition;
            block15: {
                long position = -1L;
                if (this.firstGet) {
                    Object fileSyncConf = fileSyncConfFsync.syncConf(this.filePath);
                    if (fileSyncConf != null) {
                        position = fileSyncConf.syncPosition();
                    }
                    this.firstGet = false;
                } else {
                    this.fileSyncConfFsyncListener.before(this.filePath, "syncing");
                    try {
                        fileSyncConfFsync.fsync(this.filePath, Util.formatTime(LocalDateTime.now(), new String[0]), null, this.lineSumCount, "syncing", this.fileChannel.position(), this.filePath);
                    }
                    catch (IOException e) {
                        throw Util.convert2RuntimeException(e);
                    }
                    this.fileSyncConfFsyncListener.post(this.filePath, "syncing");
                }
                if (position > 0L) {
                    try {
                        log.info("set the read position: " + position + ",file: " + this.filePath);
                        this.fileChannel.position(position);
                    }
                    catch (IOException e) {
                        throw Util.convert2RuntimeException(e);
                    }
                }
                try {
                    blockPosition = this.fileChannel.position();
                }
                catch (IOException e) {
                    throw Util.convert2RuntimeException(e);
                }
                lineList = new ArrayList<Part>();
                do {
                    block14: {
                        this.sharedByteBuffer.clear();
                        try {
                            if (this.fileChannel.read(this.sharedByteBuffer) != -1) break block14;
                            this.event = Event.EOF;
                            break block15;
                        }
                        catch (IOException e) {
                            throw Util.convert2RuntimeException(e);
                        }
                    }
                    ByteBuffer allocate = ByteBuffer.allocate(this.previousPart.length + this.sharedByteBuffer.position());
                    allocate.put(this.previousPart);
                    this.previousPart = new byte[0];
                    this.sharedByteBuffer.flip();
                    allocate.put(this.sharedByteBuffer);
                    byte[] bytes = allocate.array();
                    this.previousPart = TextStream.lines(lineList, bytes, bytes.length, blockPosition);
                } while (lineList.size() <= 1000);
                log.info("get enough lines: " + lineList.size());
                this.lineSumCount += (long)lineList.size();
                this.lastGetCount = lineList.size();
                return lineList;
            }
            if (this.previousPart.length > 0 && !this.continueGet(this.lastGetCount)) {
                String line = new String(this.previousPart, StandardCharsets.UTF_8);
                lineList.add(new Part(line.trim(), blockPosition));
            }
            return lineList;
        }

        @Override
        public boolean continueGet(int lastGetCount) {
            return this.continueGet.continueGet(lastGetCount, this.file, this.event);
        }

        @Override
        public void onComplete(Part nextCompare, long sum, FileEntrySupplier.FileSyncConfFsync fileSyncConfFsync, Throwable t) {
            if (t != null) {
                log.error("encountered exception, cannot fsync config: " + this.filePath, t);
                return;
            }
            this.fileSyncConfFsyncListener.before(this.filePath, "complete");
            try {
                fileSyncConfFsync.fsync(this.filePath, Util.formatTime(LocalDateTime.now(), new String[0]), null, sum, "complete", this.fileChannel.position(), this.filePath);
            }
            catch (IOException e) {
                throw Util.convert2RuntimeException(e);
            }
            this.fileSyncConfFsyncListener.post(this.filePath, "complete");
        }

        @Override
        public void runFinally(String type) {
            try {
                this.fileChannel.close();
                log.info("close io: " + this.filePath);
            }
            catch (IOException e) {
                throw Util.convert2RuntimeException(e);
            }
        }
    }

    static class GZFileEntrySupplierImpl
    implements FileEntrySupplier<Part>,
    Stream.DoFinally {
        private static final Logger log = LoggerFactory.getLogger(GZFileEntrySupplierImpl.class);
        private final String filePath;
        private final File file;
        private final GZIPInputStream gzipInputStream;
        private final int lineCount = 1000;
        private final ContinueGet continueGet = new EndAtEOF();
        private volatile Event event;
        private long lineSumCount;
        private byte[] previousPart = new byte[0];
        private volatile int lastGetCount;
        private volatile long readLength_GZIP;
        private volatile boolean firstGet = true;
        private final FileEntrySupplier.FileSyncConfFsyncListener fileSyncConfFsyncListener;

        public GZFileEntrySupplierImpl(String filePath, FileEntrySupplier.FileSyncConfFsyncListener fileSyncConfFsyncListener) throws IOException {
            this.filePath = filePath;
            this.file = new File(filePath);
            this.gzipInputStream = new GZIPInputStream(new FileInputStream(this.file));
            this.fileSyncConfFsyncListener = fileSyncConfFsyncListener;
        }

        private int readNBytes(byte[] b, int off, int len) throws IOException {
            int n;
            int count;
            for (n = 0; n < len && (count = this.gzipInputStream.read(b, off + n, len - n)) >= 0; n += count) {
            }
            return n;
        }

        @Override
        public List<Part> get(Part nextCompare, FileEntrySupplier.FileSyncConfFsync fileSyncConfFsync) {
            ArrayList<Part> lineList;
            long blockPosition;
            block14: {
                long position = -1L;
                if (this.firstGet) {
                    Object fileSyncConf = fileSyncConfFsync.syncConf(this.filePath);
                    if (fileSyncConf != null) {
                        position = fileSyncConf.syncPosition();
                    }
                    if (position > 0L) {
                        this.readLength_GZIP = position;
                    }
                    this.firstGet = false;
                } else {
                    this.fileSyncConfFsyncListener.before(this.filePath, "syncing");
                    fileSyncConfFsync.fsync(this.filePath, Util.formatTime(LocalDateTime.now(), new String[0]), null, this.lineSumCount, "syncing", this.readLength_GZIP, this.filePath);
                    this.fileSyncConfFsyncListener.post(this.filePath, "syncing");
                }
                if (position > 0L) {
                    try {
                        log.info("set the read position: " + position + ",file: " + this.filePath);
                        long skip = this.gzipInputStream.skip(position);
                        log.info("read skip: " + skip + ",file: " + this.filePath);
                    }
                    catch (EOFException e) {
                        this.event = Event.EOF;
                        return Collections.EMPTY_LIST;
                    }
                    catch (IOException e) {
                        throw Util.convert2RuntimeException(e);
                    }
                }
                blockPosition = this.readLength_GZIP;
                lineList = new ArrayList<Part>();
                do {
                    byte[] bytes;
                    int availablePosition = -1;
                    try {
                        boolean eof;
                        bytes = new byte[this.previousPart.length + 0x100000];
                        int readInputLength = -1;
                        readInputLength = this.readNBytes(bytes, this.previousPart.length, bytes.length - this.previousPart.length);
                        boolean bl = eof = readInputLength == -1 || readInputLength == 0;
                        if (eof) {
                            this.event = Event.EOF;
                            break block14;
                        }
                        this.readLength_GZIP += (long)readInputLength;
                        availablePosition = this.previousPart.length + readInputLength;
                    }
                    catch (IOException e) {
                        throw Util.convert2RuntimeException(e);
                    }
                    for (int i = 0; i < this.previousPart.length; ++i) {
                        bytes[i] = this.previousPart[i];
                    }
                    this.previousPart = TextStream.lines(lineList, bytes, availablePosition, blockPosition);
                } while (lineList.size() <= 1000);
                log.info("get enough lines: " + lineList.size());
                this.lineSumCount += (long)lineList.size();
                this.lastGetCount = lineList.size();
                return lineList;
            }
            if (this.previousPart.length > 0 && !this.continueGet(this.lastGetCount)) {
                String line = new String(this.previousPart, StandardCharsets.UTF_8);
                lineList.add(new Part(line.trim(), blockPosition));
            }
            return lineList;
        }

        @Override
        public boolean continueGet(int lastGetCount) {
            return this.continueGet.continueGet(lastGetCount, this.file, this.event);
        }

        @Override
        public void onComplete(Part nextCompare, long sum, FileEntrySupplier.FileSyncConfFsync fileSyncConfFsync, Throwable t) {
            this.fileSyncConfFsyncListener.before(this.filePath, "complete");
            fileSyncConfFsync.fsync(this.filePath, Util.formatTime(LocalDateTime.now(), new String[0]), null, sum, "complete", this.readLength_GZIP, this.filePath);
            this.fileSyncConfFsyncListener.post(this.filePath, "complete");
        }

        @Override
        public void runFinally(String type) {
            try {
                this.gzipInputStream.close();
                log.info("close io: " + this.filePath);
            }
            catch (IOException e) {
                throw Util.convert2RuntimeException(e);
            }
        }
    }

    private static class MergeOperator
    extends Stream.StreamOperator<Part, Line> {
        private static final Logger log = LoggerFactory.getLogger(MergeOperator.class);
        private final NewLinePredicate newLinePredicate;
        private final int maxLineCount;

        public MergeOperator(Stream<? extends Part> stream, NewLinePredicate newLinePredicate, int maxLineCount) {
            super(stream);
            this.newLinePredicate = newLinePredicate;
            this.maxLineCount = maxLineCount;
        }

        @Override
        Subscriber<? super Part> subscribeOrReturn(Subscriber<? super Line> actual) {
            return new MergeSubscriber(actual);
        }

        private class MergeSubscriber
        implements Subscriber<Part>,
        Subscription {
            final Subscriber<? super Line> actual;
            Subscription subscription;
            Line line = new Line();

            public MergeSubscriber(Subscriber<? super Line> actual) {
                this.actual = actual;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }

            @Override
            public void onNext(Part record) {
                if (MergeOperator.this.newLinePredicate.test(record.getText()) && !this.line.isEmpty()) {
                    this.actual.onNext(this.line);
                    this.line = new Line();
                }
                this.line.add(record);
                if (this.line.size() > MergeOperator.this.maxLineCount) {
                    throw new RuntimeException("avoid OOM,exceed max line: " + this.line.size() + ">\r\n" + this.line.get(0) + "\r\n" + this.line.get(1) + "\r\n");
                }
            }

            @Override
            public void onError(Throwable t) {
                this.actual.onError(t);
            }

            @Override
            public void onComplete() {
                if (this.line != null && !this.line.isEmpty()) {
                    this.actual.onNext(this.line);
                    this.line = new Line();
                }
                this.actual.onComplete();
            }

            @Override
            public void request(long n) {
                this.subscription.request(n);
            }

            @Override
            public void cancel() {
                this.subscription.cancel();
            }
        }
    }

    public static interface NewLinePredicate
    extends Predicate<String> {
    }

    public static class Part {
        private final String text;
        private final long blockPosition;

        public Part(String text, long blockPosition) {
            this.text = text;
            this.blockPosition = blockPosition;
        }

        public String getText() {
            return this.text;
        }

        public long getBlockPosition() {
            return this.blockPosition;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Part)) {
                return false;
            }
            Part other = (Part)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getBlockPosition() != other.getBlockPosition()) {
                return false;
            }
            String this$text = this.getText();
            String other$text = other.getText();
            return !(this$text == null ? other$text != null : !this$text.equals(other$text));
        }

        protected boolean canEqual(Object other) {
            return other instanceof Part;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $blockPosition = this.getBlockPosition();
            result = result * 59 + (int)($blockPosition >>> 32 ^ $blockPosition);
            String $text = this.getText();
            result = result * 59 + ($text == null ? 43 : $text.hashCode());
            return result;
        }

        public String toString() {
            return "TextStream.Part(text=" + this.getText() + ", blockPosition=" + this.getBlockPosition() + ")";
        }
    }

    static class EndAtEOF
    implements ContinueGet {
        EndAtEOF() {
        }

        @Override
        public boolean continueGet(int lastGetCount, File file, Event event) {
            return Event.EOF != event;
        }
    }

    public static class Line
    extends ArrayList<Part> {
        public String limit_1KB(String split) {
            return this.limit_any(1024L, split);
        }

        public String limit_1MB(String split) {
            return this.limit_any(0x100000L, split);
        }

        public String limit_10MB(String split) {
            return this.limit_any(0xA00000L, split);
        }

        private String limit_any(long length, String split) {
            StringBuffer stringBuffer = new StringBuffer();
            long l = 0L;
            for (Part part : this) {
                String line = part.getText();
                stringBuffer.append(line + split);
                if ((l += (long)line.length()) <= length) continue;
                break;
            }
            return stringBuffer.toString();
        }
    }

    public static enum Event {
        EOF;

    }
}

