/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.agents.runtime;

import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.runtime.operator.ActionExecutionOperatorFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.types.Row;

public class CompileUtils {
    public static DataStream<byte[]> connectToAgent(KeyedStream<Row, Row> inputDataStream, String agentPlanJson) throws JsonProcessingException {
        AgentPlan agentPlan = new ObjectMapper().readValue(agentPlanJson, AgentPlan.class);
        return CompileUtils.connectToAgent(inputDataStream, agentPlan, TypeInformation.of(byte[].class), false);
    }

    public static <IN, K> DataStream<Object> connectToAgent(DataStream<IN> inputStream, KeySelector<IN, K> keySelector, AgentPlan agentPlan) {
        return CompileUtils.connectToAgent(inputStream.keyBy(keySelector), agentPlan);
    }

    public static <IN, K> DataStream<Object> connectToAgent(KeyedStream<IN, K> keyedInputStream, AgentPlan agentPlan) {
        return CompileUtils.connectToAgent(keyedInputStream, agentPlan, TypeInformation.of(Object.class), true);
    }

    private static <K, IN, OUT> DataStream<OUT> connectToAgent(KeyedStream<IN, K> keyedInputStream, AgentPlan agentPlan, TypeInformation<OUT> outTypeInformation, boolean inputIsJava) {
        return keyedInputStream.transform("action-execute-operator", outTypeInformation, new ActionExecutionOperatorFactory(agentPlan, inputIsJava)).setParallelism(keyedInputStream.getParallelism());
    }
}

