/*
 * Decompiled with CFR 0.152.
 */
package com.ovopark.kernel.shared.stream;

import com.ovopark.kernel.shared.Config;
import com.ovopark.kernel.shared.Util;
import com.ovopark.kernel.shared.kv.CacheService;
import com.ovopark.kernel.shared.stream.CoreSubscriber;
import com.ovopark.kernel.shared.stream.Publisher;
import com.ovopark.kernel.shared.stream.Subscriber;
import com.ovopark.kernel.shared.stream.Subscription;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.event.Level;

public abstract class Stream<T>
implements Publisher<T> {
    private static final Logger log = LoggerFactory.getLogger(Stream.class);
    public static final String _trace_id_static_ = Config.ConfigPriority.option().getString("shared.jdk8.module.traceId", Config.ConfigPriority.option().getString("spring.application.name.traceId", "unnamed"));

    public static <T> Stream<T> from(Publisher<T> publisher) {
        return new StreamSource<T>(publisher);
    }

    public static <T> Stream<T> from(Iterator<T> iterator) {
        IteratorPublisher<Iterator<T>> iteratorPublisher = new IteratorPublisher<Iterator<T>>(iterator);
        return Stream.from(iteratorPublisher);
    }

    public static <T> Stream<T> from(List<T> list) {
        IteratorPublisher<Iterator<T>> iteratorPublisher = new IteratorPublisher<Iterator<T>>(list.iterator());
        return Stream.from(iteratorPublisher);
    }

    public static <T> Stream<T> from(T one) {
        IteratorPublisher<T> iteratorPublisher = new IteratorPublisher<T>(one);
        return Stream.from(iteratorPublisher);
    }

    public static <T> Stream<T> from(DBEntrySupplier<T> dbEntrySupplier) {
        return Stream.from(dbEntrySupplier, null);
    }

    public static <T> Stream<T> from(DBEntrySupplier<T> dbEntrySupplier, DBEntrySupplier.SyncConfFsync syncConfFsync) {
        DBEntryPublisher<T> iteratorPublisher = new DBEntryPublisher<T>(dbEntrySupplier, syncConfFsync);
        return Stream.from(iteratorPublisher);
    }

    public final <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
        MapOperator<? super T, ? extends R> mapOperator = new MapOperator<T, R>(this, mapper);
        return mapOperator;
    }

    public final Stream<T> filter(Predicate<? super T> filter) {
        FilterOperator<? super T> filterOperator = new FilterOperator<T>(this, filter);
        return filterOperator;
    }

    public final Stream<T> log(Logger logger, EntryLogger entryLogger) {
        return this.log(logger, Level.INFO, entryLogger);
    }

    public final Stream<T> log(Logger logger) {
        return this.log(logger, record -> String.valueOf(record));
    }

    public final Stream<T> log(Logger logger, Level level, EntryLogger entryLogger) {
        LoggerOperator loggerOperator = new LoggerOperator(this, logger, level, entryLogger);
        return loggerOperator;
    }

    public final Stream<T> subscribeOn(StreamExecutor streamExecutor) {
        if (streamExecutor == null) {
            return this;
        }
        SubscribeOn subscribeOn = new SubscribeOn(this, streamExecutor);
        return subscribeOn;
    }

    @Override
    public final void subscribe(final Consumer<? super T> consumer) {
        this.subscribe(new CoreSubscriber<T>(){

            @Override
            public void onNext(T record) {
                consumer.accept(record);
            }

            @Override
            public void onError(Throwable t) {
                throw Util.convert2RuntimeException(t);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void subscribe(Subscriber<? super T> actual) {
        Publisher<Object> publisher = this;
        Subscriber<Object> subscriber = actual;
        ArrayList<DoFinally> doFinallyList = new ArrayList<DoFinally>();
        if (this instanceof StreamOperator) {
            StreamOperator<?, ? super T> upStreamOperator = (StreamOperator<?, ? super T>)this;
            while (true) {
                StreamOperator<?, ? super T> nextStreamOperator;
                if ((subscriber = upStreamOperator.subscribeOrReturn(subscriber)) == null) {
                    return;
                }
                if (subscriber instanceof DoFinally) {
                    doFinallyList.add((DoFinally)((Object)subscriber));
                }
                if ((nextStreamOperator = upStreamOperator.nextStreamOperator()) == null) {
                    publisher = upStreamOperator.source();
                    break;
                }
                upStreamOperator = nextStreamOperator;
            }
        }
        try {
            if (publisher instanceof StreamSource) {
                ((StreamSource)publisher).subscribe0(subscriber);
            } else {
                publisher.subscribe(subscriber);
            }
        }
        finally {
            for (DoFinally doFinally : doFinallyList) {
                try {
                    doFinally.runFinally("ANY");
                }
                catch (Throwable e) {
                    log.error(e.getMessage(), e);
                }
            }
        }
    }

    public final Stream<T> onErrorStop() {
        OnErrorOperator onErrorOperator = new OnErrorOperator(this, OnError.STOP);
        return onErrorOperator;
    }

    public final Stream<T> onErrorContinue() {
        OnErrorOperator onErrorOperator = new OnErrorOperator(this, OnError.CONTINUE);
        return onErrorOperator;
    }

    public final Stream<List<T>> partition(int partition) {
        PartitionOperator partitionOperator = new PartitionOperator(this, partition);
        return partitionOperator;
    }

    public final Stream<T> cancelOnMax(int max) {
        CancelOnMaxOperator cancelOnMaxOperator = new CancelOnMaxOperator(this, max);
        return cancelOnMaxOperator;
    }

    public final Stream<T> doFinally(DoFinally doFinally) {
        return this.doFinally(doFinally, true);
    }

    public final Stream<T> doFinally(DoFinally doFinally, boolean consumer) {
        DoFinallyOperator doFinallyOperator = new DoFinallyOperator(this, doFinally, consumer);
        return doFinallyOperator;
    }

    private static class StreamSource<T>
    extends Stream<T> {
        final Publisher<? extends T> publisher;

        public StreamSource(Publisher<? extends T> publisher) {
            this.publisher = publisher;
        }

        public final void subscribe0(Subscriber<? super T> actual) {
            this.publisher.subscribe(actual);
        }
    }

    static class IteratorPublisher<T>
    implements Publisher<T> {
        private static final Logger log = LoggerFactory.getLogger(IteratorPublisher.class);
        protected final Iterator<T> iterator;

        public IteratorPublisher(Iterator<T> iterator) {
            this.iterator = iterator;
        }

        public IteratorPublisher(T one) {
            this(Arrays.asList(one).iterator());
        }

        @Override
        public void subscribe(final Subscriber<? super T> subscriber) {
            Subscription subscription = new Subscription(){
                volatile boolean cancelled;

                @Override
                public void request(long n) {
                    for (long i = 0L; i < n; ++i) {
                        if (this.cancelled) {
                            return;
                        }
                        if (!iterator.hasNext()) break;
                        Object message = iterator.next();
                        if (this.cancelled) {
                            return;
                        }
                        subscriber.onNext(message);
                    }
                    if (this.cancelled) {
                        return;
                    }
                    subscriber.onComplete();
                }

                @Override
                public void cancel() {
                    this.cancelled = true;
                }
            };
            subscriber.onSubscribe(subscription);
        }
    }

    public static interface DBEntrySupplier<T>
    extends Comparator<T> {
        public List<T> get(T var1, SyncConfFsync var2);

        public boolean continueGet(int var1);

        @Override
        default public int compare(T o1, T o2) {
            return Util.asc(o1, o2);
        }

        default public void onComplete(T nextCompare, long sum, SyncConfFsync syncConfFsync, Throwable t) {
        }

        public static interface SyncConf {
            public String syncId();

            public String syncTime();

            public String syncPk();

            public LocalDateTime taskTime();

            public String syncText();

            public Integer syncCount();

            public String syncStatus();

            default public long ver() {
                return -1L;
            }
        }

        public static interface SyncConfFsync<T extends SyncConf> {
            public void fsync(String var1, String var2, String var3, int var4, String var5);

            default public long fsyncWithVcc(long ver, String syncId, String syncTime, String syncPk, int syncCount, String syncStatus) {
                return -1L;
            }

            public T syncConf(String var1);
        }
    }

    static final class DBEntryPublisher<T>
    implements Publisher<T> {
        private static final Logger log = LoggerFactory.getLogger(DBEntryPublisher.class);
        private final DBEntrySupplier<T> entrySupplier;
        public final DBEntrySupplier.SyncConfFsync syncConfFsync;

        public DBEntryPublisher(DBEntrySupplier<T> entrySupplier, DBEntrySupplier.SyncConfFsync syncConfFsync) {
            this.entrySupplier = entrySupplier;
            this.syncConfFsync = syncConfFsync;
        }

        @Override
        public void subscribe(final Subscriber<? super T> subscriber) {
            final AtomicInteger index = new AtomicInteger(0);
            final AtomicReference listRef = new AtomicReference();
            final AtomicReference nextCompareRef = new AtomicReference();
            final AtomicLong fetchCount = new AtomicLong(0L);
            final AtomicLong sumCount = new AtomicLong(0L);
            final AtomicLong doneCount = new AtomicLong(0L);
            final long start = System.currentTimeMillis();
            Subscription subscription = new Subscription(){
                volatile boolean cancelled;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void request(long n) {
                    for (long i = 0L; i < n; ++i) {
                        List<Object> list;
                        if (this.cancelled) {
                            return;
                        }
                        if (listRef.get() == null) {
                            log.info("first, to fetch data from supplier: " + entrySupplier.getClass().getName());
                            list = entrySupplier.get(null, syncConfFsync);
                            fetchCount.incrementAndGet();
                            sumCount.getAndAdd(list == null ? 0L : (long)list.size());
                            listRef.set(list == null ? Collections.EMPTY_LIST : list);
                            if (list == null || list.isEmpty()) {
                                log.info("first,no data,exit. cost: " + Util.costTime(start));
                                break;
                            }
                        }
                        if (index.get() >= ((List)listRef.get()).size()) {
                            if (!entrySupplier.continueGet(((List)listRef.get()).size())) {
                                log.info(fetchCount.get() + ",(" + doneCount.get() + "/" + sumCount.get() + "),no data,exit. cost: " + Util.costTime(start));
                                break;
                            }
                            list = entrySupplier.get(nextCompareRef.get(), syncConfFsync);
                            fetchCount.incrementAndGet();
                            sumCount.getAndAdd(list == null ? 0L : (long)list.size());
                            log.info(fetchCount.get() + ",(" + doneCount.get() + "/" + sumCount.get() + "),to fetch data from supplier: " + entrySupplier.getClass().getName());
                            listRef.set(list == null ? Collections.EMPTY_LIST : list);
                            if (list == null || list.isEmpty()) {
                                log.info(fetchCount.get() + ",(" + doneCount.get() + "/" + sumCount.get() + "),no data,exit. cost: " + Util.costTime(start));
                                break;
                            }
                            index.set(0);
                        }
                        int l = index.getAndIncrement();
                        Object t = ((List)listRef.get()).get(l);
                        if (nextCompareRef.get() == null) {
                            nextCompareRef.set(t);
                        } else if (entrySupplier.compare(nextCompareRef.get(), t) < 0) {
                            nextCompareRef.set(t);
                        }
                        if (this.cancelled) {
                            return;
                        }
                        subscriber.onNext(t);
                        doneCount.incrementAndGet();
                    }
                    if (this.cancelled) {
                        return;
                    }
                    Throwable t = null;
                    try {
                        subscriber.onComplete();
                    }
                    catch (Throwable throwable) {
                        t = throwable;
                    }
                    finally {
                        entrySupplier.onComplete(nextCompareRef.get(), sumCount.get(), syncConfFsync, t);
                    }
                }

                @Override
                public void cancel() {
                    this.cancelled = true;
                }
            };
            subscriber.onSubscribe(subscription);
        }
    }

    private static class MapOperator<T, R>
    extends StreamOperator<T, R> {
        private static final Logger log = LoggerFactory.getLogger(MapOperator.class);
        final Function<? super T, ? extends R> mapper;

        public MapOperator(Stream<? extends T> stream, Function<? super T, ? extends R> mapper) {
            super(stream);
            this.mapper = mapper;
        }

        @Override
        Subscriber<? super T> subscribeOrReturn(Subscriber<? super R> actual) {
            return new MapSubscriber<T, R>(actual, this.mapper);
        }

        private class MapSubscriber<T, R>
        implements Subscriber<T>,
        Subscription {
            final Subscriber<? super R> actual;
            final Function<? super T, ? extends R> mapper;
            Subscription subscription;

            public MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
                this.actual = actual;
                this.mapper = mapper;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }

            @Override
            public void onNext(T record) {
                R v;
                try {
                    v = this.mapper.apply(record);
                }
                catch (Throwable e) {
                    this.onError(e);
                    return;
                }
                this.actual.onNext(v);
            }

            @Override
            public void onError(Throwable t) {
                this.actual.onError(t);
            }

            @Override
            public void onComplete() {
                this.actual.onComplete();
            }

            @Override
            public void request(long n) {
                this.subscription.request(n);
            }

            @Override
            public void cancel() {
                this.subscription.cancel();
            }
        }
    }

    private static class FilterOperator<T>
    extends StreamOperator<T, T> {
        private static final Logger log = LoggerFactory.getLogger(FilterOperator.class);
        final Predicate<? super T> filter;

        public FilterOperator(Stream<? extends T> stream, Predicate<? super T> filter) {
            super(stream);
            this.filter = filter;
        }

        @Override
        Subscriber<? super T> subscribeOrReturn(Subscriber<? super T> actual) {
            FilterSubscriber<? super T> filterSubscriber = new FilterSubscriber<T>(actual, this.filter);
            return filterSubscriber;
        }

        private class FilterSubscriber<T>
        implements Subscriber<T>,
        Subscription {
            final Subscriber<? super T> actual;
            final Predicate<? super T> filter;
            Subscription subscription;

            public FilterSubscriber(Subscriber<? super T> actual, Predicate<? super T> filter) {
                this.actual = actual;
                this.filter = filter;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }

            @Override
            public void onNext(T record) {
                boolean flag;
                try {
                    flag = this.filter.test(record);
                }
                catch (Throwable e) {
                    this.onError(e);
                    return;
                }
                if (flag) {
                    this.actual.onNext(record);
                }
            }

            @Override
            public void onError(Throwable t) {
                this.actual.onError(t);
            }

            @Override
            public void onComplete() {
                this.actual.onComplete();
            }

            @Override
            public void request(long n) {
                this.subscription.request(n);
            }

            @Override
            public void cancel() {
                this.subscription.cancel();
            }
        }
    }

    public static interface EntryLogger {
        public String log(Object var1);
    }

    private static class LoggerOperator<T>
    extends StreamOperator<T, T> {
        private static final Logger log = LoggerFactory.getLogger(LoggerOperator.class);
        private final Logger logger;
        private final Level level;
        private final EntryLogger entryLogger;

        public LoggerOperator(Stream<? extends T> stream, Logger logger, Level level, EntryLogger entryLogger) {
            super(stream);
            this.logger = logger;
            this.level = level;
            this.entryLogger = entryLogger;
        }

        @Override
        Subscriber<? super T> subscribeOrReturn(Subscriber<? super T> actual) {
            return new LoggerSubscriber<T>(actual);
        }

        private class LoggerSubscriber<T>
        implements Subscriber<T>,
        Subscription {
            final Subscriber<? super T> actual;
            Subscription subscription;

            public LoggerSubscriber(Subscriber<? super T> actual) {
                this.actual = actual;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                this.log(this.actual.getClass().getSimpleName() + " onSubscribe: " + subscription.getClass().getName());
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }

            @Override
            public void onNext(T record) {
                this.log(this.actual.getClass().getSimpleName() + " onNext: " + LoggerOperator.this.entryLogger.log(record));
                this.actual.onNext(record);
            }

            private void log(Object record) {
                if (Level.DEBUG == LoggerOperator.this.level) {
                    LoggerOperator.this.logger.debug("log:" + Thread.currentThread().getName() + " > " + String.valueOf(record));
                } else if (Level.INFO == LoggerOperator.this.level) {
                    LoggerOperator.this.logger.info("log:" + Thread.currentThread().getName() + " > " + String.valueOf(record));
                } else if (Level.TRACE == LoggerOperator.this.level) {
                    LoggerOperator.this.logger.trace("log:" + Thread.currentThread().getName() + " > " + String.valueOf(record));
                } else if (Level.WARN == LoggerOperator.this.level) {
                    LoggerOperator.this.logger.warn("log:" + Thread.currentThread().getName() + " > " + String.valueOf(record));
                } else if (Level.ERROR == LoggerOperator.this.level) {
                    LoggerOperator.this.logger.error("log:" + Thread.currentThread().getName() + " > " + String.valueOf(record));
                }
            }

            @Override
            public void onError(Throwable t) {
                this.log(this.actual.getClass().getSimpleName() + " onError: " + String.valueOf(t));
                this.actual.onError(t);
            }

            @Override
            public void onComplete() {
                this.log(this.actual.getClass().getSimpleName() + " onComplete");
                this.actual.onComplete();
            }

            @Override
            public void request(long n) {
                this.log(this.subscription.getClass().getSimpleName() + " request: " + n);
                this.subscription.request(n);
            }

            @Override
            public void cancel() {
                this.log(this.subscription.getClass().getSimpleName() + " cancel");
                this.subscription.cancel();
            }
        }
    }

    private static class SubscribeOn<T>
    extends StreamOperator<T, T> {
        private static final Logger log = LoggerFactory.getLogger(SubscribeOn.class);
        private final StreamExecutor streamExecutor;

        public SubscribeOn(Stream<? extends T> stream, StreamExecutor streamExecutor) {
            super(stream);
            this.streamExecutor = streamExecutor;
        }

        @Override
        Subscriber<? super T> subscribeOrReturn(Subscriber<? super T> actual) {
            SubscribeOnSubscriber<T> subscribeOnSubscriber = new SubscribeOnSubscriber<T>(actual);
            actual.onSubscribe(subscribeOnSubscriber);
            subscribeOnSubscriber.start();
            return null;
        }

        private class SubscribeOnSubscriber<T>
        implements Subscriber<T>,
        Subscription {
            protected final Subscriber<? super T> actual;
            final AtomicReference<Subscription> subscriptionRef = new AtomicReference();
            Future<?> future;
            final AtomicLong requests = new AtomicLong(0L);

            public SubscribeOnSubscriber(Subscriber<? super T> actual) {
                this.actual = actual;
            }

            @Override
            public void onSubscribe(final Subscription subscription) {
                Subscription s = this.subscriptionRef.updateAndGet(new UnaryOperator<Subscription>(){

                    @Override
                    public Subscription apply(Subscription s) {
                        if (s == CancelledSubscription.INSTANCE) {
                            return s;
                        }
                        return subscription;
                    }
                });
                if (s == CancelledSubscription.INSTANCE) {
                    try {
                        SubscribeOn.this.streamExecutor.shutdown();
                    }
                    finally {
                        log.info("cancel upstream: " + subscription.getClass().getSimpleName());
                        subscription.cancel();
                    }
                } else {
                    long l = this.requests.getAndSet(0L);
                    if (l > 0L) {
                        this.requestUpstream(l, subscription);
                    }
                }
            }

            @Override
            public void onNext(T record) {
                if (this.subscriptionRef.get() == CancelledSubscription.INSTANCE) {
                    return;
                }
                this.actual.onNext(record);
            }

            @Override
            public void onError(Throwable t) {
                this.actual.onError(t);
            }

            @Override
            public void onComplete() {
                if (this.subscriptionRef.get() == CancelledSubscription.INSTANCE) {
                    return;
                }
                this.actual.onComplete();
            }

            void requestUpstream(long n, Subscription s) {
                if (this.subscriptionRef.get() == CancelledSubscription.INSTANCE) {
                    return;
                }
                s.request(n);
            }

            @Override
            public void request(long n) {
                if (this.subscriptionRef.get() == null) {
                    this.requests.addAndGet(n);
                } else {
                    long l = this.requests.getAndSet(0L);
                    this.requestUpstream(l + n, this.subscriptionRef.get());
                }
            }

            @Override
            public void cancel() {
                Subscription subscription = this.subscriptionRef.updateAndGet(new UnaryOperator<Subscription>(){

                    @Override
                    public Subscription apply(Subscription subscription) {
                        if (subscription != null && subscription != CancelledSubscription.INSTANCE) {
                            return subscription;
                        }
                        return CancelledSubscription.INSTANCE;
                    }
                });
                if (subscription != null && subscription != CancelledSubscription.INSTANCE) {
                    try {
                        SubscribeOn.this.streamExecutor.shutdown();
                    }
                    finally {
                        log.info("cancel upstream: " + subscription.getClass().getSimpleName());
                        subscription.cancel();
                    }
                }
            }

            void start() {
                final String requestId = MDC.get((String)"requestId");
                final String traceId = MDC.get((String)"traceId");
                final String msgTraceId = MDC.get((String)_trace_id_static_);
                this.future = SubscribeOn.this.streamExecutor.submit(new Runnable(){

                    @Override
                    public void run() {
                        MDC.put((String)"requestId", (String)requestId);
                        MDC.put((String)"traceId", (String)traceId);
                        MDC.put((String)_trace_id_static_, (String)msgTraceId);
                        try {
                            SubscribeOn.this.stream.subscribe(SubscribeOnSubscriber.this);
                        }
                        catch (Throwable e) {
                            log.error(e.getMessage(), e);
                            throw Util.convert2RuntimeException(e);
                        }
                        finally {
                            SubscribeOn.this.streamExecutor.shutdown();
                            MDC.remove((String)"requestId");
                            MDC.remove((String)"traceId");
                            MDC.remove((String)_trace_id_static_);
                        }
                    }
                });
            }
        }
    }

    public static interface StreamExecutor
    extends Executor {
        public void shutdown();

        public Future<?> submit(Runnable var1);
    }

    static abstract class StreamOperator<IN, OUT>
    extends Stream<OUT> {
        protected final Stream<? extends IN> stream;
        protected final StreamOperator<?, ? extends OUT> streamOperator;

        public StreamOperator(Stream<? extends IN> stream) {
            this.stream = stream;
            this.streamOperator = stream instanceof StreamOperator ? (StreamOperator<IN, OUT>)stream : null;
        }

        abstract Subscriber<? super IN> subscribeOrReturn(Subscriber<? super OUT> var1);

        protected StreamOperator<?, ? extends OUT> nextStreamOperator() {
            return this.stream instanceof StreamOperator ? (StreamOperator)this.stream : null;
        }

        protected Publisher<? extends IN> source() {
            return this.stream;
        }
    }

    public static interface DoFinally {
        public void runFinally(String var1);
    }

    private static class OnErrorOperator<T>
    extends StreamOperator<T, T> {
        private static final Logger log = LoggerFactory.getLogger(OnErrorOperator.class);
        final OnError onError;

        public OnErrorOperator(Stream<? extends T> stream, OnError onError) {
            super(stream);
            this.onError = onError;
        }

        @Override
        Subscriber<? super T> subscribeOrReturn(Subscriber<? super T> actual) {
            return new OnErrorSubscriber<T>(actual);
        }

        private class OnErrorSubscriber<T>
        implements Subscriber<T>,
        Subscription {
            final Subscriber<? super T> actual;
            Subscription subscription;

            public OnErrorSubscriber(Subscriber<? super T> actual) {
                this.actual = actual;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }

            @Override
            public void onNext(T record) {
                this.actual.onNext(record);
            }

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            @Override
            public void onError(Throwable t) {
                if (Context.upOnError.get() == null || !Context.upOnError.get().booleanValue()) {
                    try {
                        Context.upOnError.set(true);
                        if (OnErrorOperator.this.onError == OnError.STOP) {
                            try {
                                this.actual.onError(t);
                                return;
                            }
                            finally {
                                this.cancel();
                            }
                        }
                        if (OnErrorOperator.this.onError != OnError.CONTINUE) return;
                        try {
                            this.actual.onError(t);
                            return;
                        }
                        catch (Exception e) {
                            log.error(e.getMessage(), (Throwable)e);
                        }
                        return;
                    }
                    finally {
                        Context.upOnError.remove();
                    }
                } else {
                    this.actual.onError(t);
                }
            }

            @Override
            public void onComplete() {
                this.actual.onComplete();
            }

            @Override
            public void request(long n) {
                this.subscription.request(n);
            }

            @Override
            public void cancel() {
                this.subscription.cancel();
            }
        }
    }

    static enum OnError {
        STOP,
        CONTINUE;

    }

    private static class PartitionOperator<T>
    extends StreamOperator<T, List<T>> {
        private static final Logger log = LoggerFactory.getLogger(PartitionOperator.class);
        final int partition;

        public PartitionOperator(Stream<? extends T> stream, int partition) {
            super(stream);
            this.partition = partition;
        }

        @Override
        Subscriber<? super T> subscribeOrReturn(Subscriber<? super List<T>> actual) {
            return new PartitionSubscriber(actual);
        }

        private class PartitionSubscriber<T>
        implements Subscriber<T>,
        Subscription {
            final Subscriber<? super List<T>> actual;
            Subscription subscription;
            List<T> list = new ArrayList<T>();

            public PartitionSubscriber(Subscriber<? super List<T>> actual) {
                this.actual = actual;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }

            @Override
            public void onNext(T record) {
                this.list.add(record);
                if (this.list.size() == PartitionOperator.this.partition) {
                    this.actual.onNext(this.list);
                    this.list = new ArrayList<T>();
                }
            }

            @Override
            public void onError(Throwable t) {
                this.actual.onError(t);
            }

            @Override
            public void onComplete() {
                if (this.list != null && !this.list.isEmpty()) {
                    this.actual.onNext(this.list);
                    this.list = Collections.EMPTY_LIST;
                }
                this.actual.onComplete();
            }

            @Override
            public void request(long n) {
                this.subscription.request(n);
            }

            @Override
            public void cancel() {
                this.subscription.cancel();
            }
        }
    }

    private static class CancelOnMaxOperator<T>
    extends StreamOperator<T, T> {
        private static final Logger log = LoggerFactory.getLogger(CancelOnMaxOperator.class);
        final int max;

        public CancelOnMaxOperator(Stream<? extends T> stream, int max) {
            super(stream);
            this.max = max;
        }

        @Override
        Subscriber<? super T> subscribeOrReturn(Subscriber<? super T> actual) {
            return new CancelOnMaxSubscriber<T>(actual);
        }

        private class CancelOnMaxSubscriber<T>
        implements Subscriber<T>,
        Subscription {
            final Subscriber<? super T> actual;
            Subscription subscription;
            private AtomicInteger count = new AtomicInteger(0);

            public CancelOnMaxSubscriber(Subscriber<? super T> actual) {
                this.actual = actual;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }

            @Override
            public void onNext(T record) {
                if (this.count.incrementAndGet() > CancelOnMaxOperator.this.max) {
                    log.info("cancel: > max(" + CancelOnMaxOperator.this.max + ")");
                    this.cancel();
                }
                this.actual.onNext(record);
            }

            @Override
            public void onError(Throwable t) {
                this.actual.onError(t);
            }

            @Override
            public void onComplete() {
                this.actual.onComplete();
            }

            @Override
            public void request(long n) {
                this.subscription.request(n);
            }

            @Override
            public void cancel() {
                this.subscription.cancel();
            }
        }
    }

    private static class DoFinallyOperator<T>
    extends StreamOperator<T, T> {
        private final DoFinally doFinally;
        private final boolean consumer;
        private final AtomicBoolean once = new AtomicBoolean(false);

        public DoFinallyOperator(Stream<? extends T> stream, DoFinally doFinally, boolean consumer) {
            super(stream);
            this.doFinally = doFinally;
            this.consumer = consumer;
        }

        @Override
        Subscriber<? super T> subscribeOrReturn(Subscriber<? super T> actual) {
            DoFinallySubscriber<? super T> filterSubscriber = new DoFinallySubscriber<T>(actual);
            return filterSubscriber;
        }

        private class DoFinallySubscriber<T>
        implements Subscriber<T>,
        Subscription,
        DoFinally {
            final Subscriber<? super T> actual;
            Subscription subscription;

            public DoFinallySubscriber(Subscriber<? super T> actual) {
                this.actual = actual;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.actual.onSubscribe(this);
            }

            @Override
            public void onNext(T record) {
                this.actual.onNext(record);
            }

            @Override
            public void onError(Throwable t) {
                this.actual.onError(t);
            }

            @Override
            public void onComplete() {
                try {
                    this.actual.onComplete();
                }
                finally {
                    if (DoFinallyOperator.this.consumer) {
                        this.runFinally("ON_COMPLETE");
                    }
                }
            }

            @Override
            public void request(long n) {
                this.subscription.request(n);
            }

            @Override
            public void cancel() {
                try {
                    this.subscription.cancel();
                }
                finally {
                    if (DoFinallyOperator.this.consumer) {
                        this.runFinally("CANCEL");
                    }
                }
            }

            @Override
            public void runFinally(String type) {
                if (DoFinallyOperator.this.once.compareAndSet(false, true)) {
                    try {
                        DoFinallyOperator.this.doFinally.runFinally(type);
                    }
                    catch (Throwable e) {
                        throw Util.convert2RuntimeException(e);
                    }
                }
            }
        }
    }

    public static class StreamExt<T, SE extends StreamExt<T, SE>> {
        private Stream stream;

        protected StreamExt(Stream stream) {
            this.stream = stream;
        }

        protected Stream getStream() {
            return this.stream;
        }

        public final <R> SE map(Function<? super T, ? extends R> mapper) {
            this.stream = this.stream.map(mapper);
            return (SE)this;
        }

        public final SE filter(Predicate<? super T> filter) {
            this.stream = this.stream.filter(filter);
            return (SE)this;
        }

        public final SE log(Logger logger, EntryLogger entryLogger) {
            return this.log(logger, Level.INFO, entryLogger);
        }

        public final SE log(Logger logger) {
            return this.log(logger, record -> String.valueOf(record));
        }

        public final SE log(Logger logger, Level level, EntryLogger entryLogger) {
            this.stream = this.stream.log(logger, level, entryLogger);
            return (SE)this;
        }

        public final SE onErrorStop() {
            this.stream = this.stream.onErrorStop();
            return (SE)this;
        }

        public final SE onErrorContinue() {
            this.stream = this.stream.onErrorContinue();
            return (SE)this;
        }

        public final SE partition(int partition) {
            this.stream = this.stream.partition(partition);
            return (SE)this;
        }

        public final SE cancelOnMax(int max) {
            this.stream = this.stream.cancelOnMax(max);
            return (SE)this;
        }

        public final SE subscribeOn(StreamExecutor streamExecutor) {
            this.stream = this.stream.subscribeOn(streamExecutor);
            return (SE)this;
        }

        public final SE doFinally(DoFinally doFinally) {
            this.stream = this.stream.doFinally(doFinally);
            return (SE)this;
        }

        public final SE doFinally(DoFinally doFinally, boolean consumer) {
            this.stream = this.stream.doFinally(doFinally, consumer);
            return (SE)this;
        }

        public final void subscribe(Consumer<? super T> consumer) throws Exception {
            this.stream.subscribe(consumer);
        }

        public final void subscribe(Subscriber<? super T> actual) {
            this.stream.subscribe(actual);
        }

        protected final SE linkStream(Stream stream) {
            this.stream = stream;
            return (SE)this;
        }
    }

    public static class FileGroupOut
    implements Out {
        private final String filePath;
        private final String group;
        private final long maxFileLength;
        private long writingLength;
        private OutputStream outputStream;
        private long sumLength;

        public FileGroupOut(String filePath, String group, long maxFileLength) {
            if (maxFileLength <= 0L) {
                throw new IllegalArgumentException("maxFileLength");
            }
            this.filePath = filePath;
            this.group = group;
            this.maxFileLength = maxFileLength;
        }

        @Override
        public synchronized void write(byte[] bytes) throws IOException {
            if (this.outputStream == null) {
                this.outputStream = new BufferedOutputStream(new FileOutputStream(new File(new File(this.filePath), this.group + "_" + this.sumLength)));
                this.writingLength = 0L;
            }
            this.outputStream.write(bytes);
            this.writingLength += (long)bytes.length;
            this.sumLength += (long)bytes.length;
            if (this.writingLength > this.maxFileLength) {
                this.fsyncAndClose(this.outputStream);
                this.outputStream = null;
            }
        }

        @Override
        public synchronized void write(String text) throws IOException {
            byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
            this.write(bytes);
        }

        private void fsyncAndClose(OutputStream out) throws IOException {
            out.flush();
            out.close();
        }

        @Override
        public synchronized void close() throws IOException {
            if (this.outputStream != null) {
                this.fsyncAndClose(this.outputStream);
                this.outputStream = null;
            }
        }
    }

    public static interface Out
    extends Closeable {
        public void write(String var1) throws IOException;

        public void write(byte[] var1) throws IOException;
    }

    public static interface SyncStatus {
        public static final String SYNCING = "syncing";
        public static final String COMPLETE = "complete";
    }

    public static abstract class MsgPerContextImpl
    implements MsgPerContext {
        private final String seq;
        private final Map<String, Object> attrs = new ConcurrentHashMap<String, Object>();
        @Deprecated
        private final CacheService jvmCacheService = CacheService.staticCache();
        @Deprecated
        private final CacheService<String, String> clusterCacheService;

        protected MsgPerContextImpl(String seq) {
            this.seq = seq;
            this.clusterCacheService = null;
        }

        @Deprecated
        public MsgPerContextImpl(String seq, CacheService clusterCacheService) {
            this.seq = seq;
            this.clusterCacheService = clusterCacheService;
        }

        @Override
        public final Object getAttr(String key) {
            return this.attrs.get(key);
        }

        @Override
        public final void setAttr(String key, Object value) {
            this.attrs.put(key, value);
        }

        @Override
        public <V> V setAttrIfAbsentAndGet(String key, Function<String, ? extends V> mappingFunction) {
            return this.attrs.computeIfAbsent(key, mappingFunction);
        }

        @Override
        public final synchronized <T> T cache(String key, Supplier<T> value) {
            if (this.attrs.containsKey(key)) {
                return (T)this.attrs.get(key);
            }
            T t = value.get();
            if (t == null) {
                return null;
            }
            this.attrs.put(key, t);
            return t;
        }

        @Override
        public final synchronized <T> T evict(String key) {
            return (T)this.attrs.remove(key);
        }

        @Override
        public <T> T jvmCache(String key) {
            return (T)this.jvmCacheService.get(key);
        }

        @Override
        public final <T> T jvmCache(String key, Supplier<T> value) {
            return this.jvmCacheService.expire(key, value.get());
        }

        @Override
        public final <T> T jvmCache(String key, Supplier<T> value, long time, TimeUnit timeUnit) {
            return this.jvmCacheService.expire(key, value.get(), time, timeUnit);
        }

        @Override
        public String clusterCache(String key) {
            return this.clusterCacheService.get(key);
        }

        @Override
        public final String clusterCache(String key, Supplier<String> value) {
            return this.clusterCacheService.expire(key, value.get());
        }

        @Override
        public final String clusterCache(String key, Supplier<String> value, long time, TimeUnit timeUnit) {
            return this.clusterCacheService.expire(key, value.get(), time, timeUnit);
        }

        @Override
        public final CacheService<String, String> cluster() {
            return this.clusterCacheService;
        }

        @Override
        public final String seq() {
            return this.seq;
        }
    }

    public static interface MsgPerContext {
        public Object getAttr(String var1);

        public void setAttr(String var1, Object var2);

        public <V> V setAttrIfAbsentAndGet(String var1, Function<String, ? extends V> var2);

        @Deprecated
        public <T> T cache(String var1, Supplier<T> var2);

        @Deprecated
        public <T> T evict(String var1);

        @Deprecated
        public <T> T jvmCache(String var1);

        @Deprecated
        public <T> T jvmCache(String var1, Supplier<T> var2);

        @Deprecated
        public <T> T jvmCache(String var1, Supplier<T> var2, long var3, TimeUnit var5);

        @Deprecated
        public String clusterCache(String var1);

        @Deprecated
        public String clusterCache(String var1, Supplier<String> var2);

        @Deprecated
        public String clusterCache(String var1, Supplier<String> var2, long var3, TimeUnit var5);

        @Deprecated
        public CacheService<String, String> cluster();

        public String seq();

        public PerLogger logger();

        public static interface SeqGenerator {
            public String seq();
        }

        public static interface PerLogger {
            default public void debug(Logger logger, String message) {
                if (logger.isDebugEnabled()) {
                    logger.debug(message);
                }
            }

            default public void info(Logger logger, String message) {
                if (logger.isInfoEnabled()) {
                    logger.info(message);
                }
            }

            default public void error(Logger logger, Exception e) {
                if (logger.isErrorEnabled()) {
                    logger.error(e.getMessage(), (Throwable)e);
                }
            }

            default public void error(Logger logger, Throwable e, String message) {
                if (logger.isErrorEnabled()) {
                    logger.error(message, e);
                }
            }

            default public void error(Logger logger, String message) {
                if (logger.isErrorEnabled()) {
                    logger.error(message);
                }
            }

            default public void warn(Logger logger, String message) {
                if (logger.isWarnEnabled()) {
                    logger.warn(message);
                }
            }
        }
    }

    private static class Context {
        static final ThreadLocal<Boolean> upOnError = new ThreadLocal();

        private Context() {
        }

        static ThreadLocal<Boolean> upOnError() {
            return upOnError;
        }
    }

    static final class CancelledSubscription
    implements Subscription {
        static final CancelledSubscription INSTANCE = new CancelledSubscription();

        CancelledSubscription() {
        }

        @Override
        public void cancel() {
        }

        @Override
        public void request(long n) {
        }
    }
}

