/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.shaded.org.apache.ignite.internal.client.table;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Function;
import org.apache.ignite.shaded.org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.shaded.org.apache.ignite.client.RetryPolicy;
import org.apache.ignite.shaded.org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.shaded.org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.shaded.org.apache.ignite.internal.client.proto.StreamerReceiverSerializer;
import org.apache.ignite.shaded.org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.shaded.org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerBatchSender;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerOptions;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerPartitionAwarenessProvider;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerSubscriber;
import org.apache.ignite.shaded.org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.shaded.org.apache.ignite.table.DataStreamerOperationType;
import org.apache.ignite.shaded.org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.shaded.org.apache.ignite.table.DataStreamerReceiverDescriptor;
import org.apache.ignite.shaded.org.apache.ignite.table.ReceiverExecutionOptions;
import org.apache.ignite.shaded.org.jetbrains.annotations.Nullable;

class ClientDataStreamer {
    ClientDataStreamer() {
    }

    static <T> CompletableFuture<Void> streamData(Flow.Publisher<DataStreamerItem<T>> publisher, DataStreamerOptions options, StreamerBatchSender<T, Integer, Void> batchSender, StreamerPartitionAwarenessProvider<T, Integer> partitionAwarenessProvider, ClientTable tbl) {
        return ClientDataStreamer.streamData(publisher, DataStreamerItem::get, DataStreamerItem::get, x -> x.operationType() == DataStreamerOperationType.REMOVE, options, batchSender, null, partitionAwarenessProvider, tbl);
    }

    static <T, E, V, R, A> CompletableFuture<Void> streamData(Flow.Publisher<E> publisher, Function<E, T> keyFunc, Function<E, V> payloadFunc, Function<E, Boolean> deleteFunc, DataStreamerOptions options, StreamerPartitionAwarenessProvider<T, Integer> partitionAwarenessProvider, ClientTable tbl, @Nullable Flow.Subscriber<R> resultSubscriber, DataStreamerReceiverDescriptor<V, A, R> receiverDescriptor, @Nullable A receiverArg) {
        ReceiverExecutionOptions opts = receiverDescriptor.options();
        if (opts != null && !opts.equals(ReceiverExecutionOptions.DEFAULT)) {
            throw new UnsupportedOperationException("Receiver options are not supported yet.");
        }
        StreamerBatchSender batchSender = (partitionId, items, deleted) -> tbl.getPartitionAssignment().thenCompose(partitionAssignment -> tbl.channel().serviceAsync(66, out -> {
            assert (deleted == null || deleted.isEmpty()) : "Deletion is not supported with receiver.";
            ClientMessagePacker w = out.out();
            w.packInt(tbl.tableId());
            w.packInt((int)partitionId);
            w.packDeploymentUnits(receiverDescriptor.units());
            w.packBoolean(resultSubscriber != null);
            StreamerReceiverSerializer.serializeReceiverInfoOnClient(w, receiverDescriptor.receiverClassName(), receiverArg, receiverDescriptor.payloadMarshaller(), receiverDescriptor.argumentMarshaller(), items);
        }, in -> resultSubscriber != null ? StreamerReceiverSerializer.deserializeReceiverResultsOnClient(in.in(), receiverDescriptor.resultMarshaller()) : null, (String)partitionAssignment.get((int)partitionId), (RetryPolicy)new RetryLimitPolicy().retryLimit(options.retryLimit()), false));
        return ClientDataStreamer.streamData(publisher, keyFunc, payloadFunc, deleteFunc, options, batchSender, resultSubscriber, partitionAwarenessProvider, tbl);
    }

    private static <T, E, V, R> CompletableFuture<Void> streamData(Flow.Publisher<E> publisher, Function<E, T> keyFunc, Function<E, V> payloadFunc, Function<E, Boolean> deleteFunc, DataStreamerOptions options, StreamerBatchSender<V, Integer, R> batchSender, @Nullable Flow.Subscriber<R> resultSubscriber, StreamerPartitionAwarenessProvider<T, Integer> partitionAwarenessProvider, ClientTable tbl) {
        IgniteLogger log = ClientUtils.logger(tbl.channel().configuration(), StreamerSubscriber.class);
        StreamerOptions streamerOpts = ClientDataStreamer.streamerOptions(options);
        StreamerSubscriber<T, E, V, R, Integer> subscriber = new StreamerSubscriber<T, E, V, R, Integer>(batchSender, resultSubscriber, keyFunc, payloadFunc, deleteFunc, partitionAwarenessProvider, streamerOpts, tbl.channel().streamerFlushExecutor(), log, tbl.channel().metrics());
        publisher.subscribe(subscriber);
        return subscriber.completionFuture();
    }

    private static StreamerOptions streamerOptions(final DataStreamerOptions options) {
        return new StreamerOptions(){

            @Override
            public int pageSize() {
                return options.pageSize();
            }

            @Override
            public int perPartitionParallelOperations() {
                return options.perPartitionParallelOperations();
            }

            @Override
            public int autoFlushInterval() {
                return options.autoFlushInterval();
            }
        };
    }
}

