/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.router.rmadmin;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationMethodWrapper;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.router.rmadmin.FederationRMAdminInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMAdminProtocolMethod
extends FederationMethodWrapper {
    private static final Logger LOG = LoggerFactory.getLogger(RMAdminProtocolMethod.class);
    private FederationStateStoreFacade federationFacade;
    private FederationRMAdminInterceptor rmAdminInterceptor;
    private Configuration configuration;

    public RMAdminProtocolMethod(Class<?>[] pTypes, Object ... pParams) throws IOException {
        super((Class[])pTypes, pParams);
    }

    public <R> Collection<R> invokeConcurrent(FederationRMAdminInterceptor interceptor, Class<R> clazz, String subClusterId) throws YarnException {
        this.rmAdminInterceptor = interceptor;
        this.federationFacade = FederationStateStoreFacade.getInstance((Configuration)interceptor.getConf());
        this.configuration = interceptor.getConf();
        if (StringUtils.isNotBlank((CharSequence)subClusterId)) {
            return this.invoke(clazz, subClusterId);
        }
        return this.invokeConcurrent(clazz);
    }

    protected <R> Collection<R> invokeConcurrent(Class<R> clazz) throws YarnException {
        String methodName = Thread.currentThread().getStackTrace()[3].getMethodName();
        this.setMethodName(methodName);
        ThreadPoolExecutor executorService = this.rmAdminInterceptor.getExecutorService();
        Map subClusterInfo = this.federationFacade.getSubClusters(true);
        Set subClusterIds = subClusterInfo.keySet();
        ArrayList<Callable<Pair>> callables = new ArrayList<Callable<Pair>>();
        ArrayList futures = new ArrayList();
        TreeMap exceptions = new TreeMap();
        for (SubClusterId subClusterId : subClusterIds) {
            callables.add(() -> {
                ResourceManagerAdministrationProtocol protocol = this.rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterId);
                Class[] types = this.getTypes();
                Object[] params = this.getParams();
                Method method = ResourceManagerAdministrationProtocol.class.getMethod(methodName, types);
                Object result = method.invoke((Object)protocol, params);
                return Pair.of((Object)subClusterId, (Object)result);
            });
        }
        TreeMap results = new TreeMap();
        try {
            futures.addAll(executorService.invokeAll(callables));
            futures.stream().forEach(future -> {
                SubClusterId subClusterId = null;
                try {
                    Pair pair = (Pair)future.get();
                    subClusterId = (SubClusterId)pair.getKey();
                    Object result = pair.getValue();
                    if (result != null) {
                        Object rResult = clazz.cast(result);
                        results.put(subClusterId, rResult);
                    }
                }
                catch (InterruptedException | ExecutionException e) {
                    Throwable cause = e.getCause();
                    LOG.error("Cannot execute {} on {}: {}", new Object[]{methodName, subClusterId, cause.getMessage()});
                    exceptions.put(subClusterId, e);
                }
            });
        }
        catch (InterruptedException e) {
            throw new YarnException("invokeConcurrent Failed.", (Throwable)e);
        }
        if (exceptions != null && !exceptions.isEmpty()) {
            Set subClusterIdSets = exceptions.keySet();
            throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " + StringUtils.join(subClusterIdSets, (String)","));
        }
        return results.values();
    }

    protected <R> Collection<R> invoke(Class<R> clazz, String subClusterId) throws YarnException {
        String methodName = Thread.currentThread().getStackTrace()[3].getMethodName();
        this.setMethodName(methodName);
        Map subClusterInfoMap = this.federationFacade.getSubClusters(true);
        SubClusterId subClusterIdKey = SubClusterId.newInstance((String)subClusterId);
        if (!subClusterInfoMap.containsKey(subClusterIdKey)) {
            throw new YarnException("subClusterId = " + subClusterId + " is not an active subCluster.");
        }
        try {
            ResourceManagerAdministrationProtocol protocol = this.rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterIdKey);
            Class[] types = this.getTypes();
            Object[] params = this.getParams();
            Method method = ResourceManagerAdministrationProtocol.class.getMethod(methodName, types);
            Object result = method.invoke((Object)protocol, params);
            if (result != null) {
                return Collections.singletonList(clazz.cast(result));
            }
        }
        catch (Exception e) {
            throw new YarnException("invoke Failed, An exception occurred in subClusterId = " + subClusterId, (Throwable)e);
        }
        throw new YarnException("invoke Failed, An exception occurred in subClusterId = " + subClusterId);
    }
}

