package com.aerospike.spark;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.Log;
import com.aerospike.client.Value;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.spark.policy.ClientPolicyBuilder$;
import com.aerospike.spark.policy.ScanPolicyBuilder$;
import com.aerospike.spark.policy.TlsPolicyBuilder$;
import com.aerospike.spark.policy.WritePolicyBuilder$;
import com.aerospike.spark.query.EventLoopProvider$;
import com.aerospike.spark.utility.HelperFunctions$;
import com.aerospike.spark.utility.ServerUtil$;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext$;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.RuntimeConfig;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.ArrayOps$;
import scala.collection.StringOps$;
import scala.reflect.ClassTag$;

/* compiled from: AerospikeConnection.scala */
/* loaded from: input_file:com/aerospike/spark/AerospikeConnection$.class */
public final class AerospikeConnection$ implements Logging {
    public static final AerospikeConnection$ MODULE$ = new AerospikeConnection$();
    private static final ConcurrentHashMap<ClientProperties, ConcurrentLinkedDeque<AerospikeClient>> clientCache;
    private static final ConcurrentHashMap<ClientProperties, AtomicInteger> clientCacheLedger;
    private static transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        Logging.$init$(MODULE$);
        clientCache = new ConcurrentHashMap<>();
        clientCacheLedger = new ConcurrentHashMap<>();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        org$apache$spark$internal$Logging$$log_ = logger;
    }

    private ConcurrentHashMap<ClientProperties, ConcurrentLinkedDeque<AerospikeClient>> clientCache() {
        return clientCache;
    }

    private ConcurrentHashMap<ClientProperties, AtomicInteger> clientCacheLedger() {
        return clientCacheLedger;
    }

    public synchronized AerospikeClient getClient(SparkConf sparkConf) {
        logDebug(() -> {
            return new StringBuilder(27).append("getClient using sparkConf: ").append(sparkConf.getAll()).toString();
        });
        return getClient(AerospikeConfig$.MODULE$.apply(sparkConf));
    }

    public synchronized AerospikeClient getClient(RuntimeConfig runtimeConfig) {
        logDebug(() -> {
            return new StringBuilder(31).append("getClient using RuntimeConfig: ").append(runtimeConfig.getAll()).toString();
        });
        return getClient(AerospikeConfig$.MODULE$.apply(runtimeConfig));
    }

    public synchronized AerospikeClient getClient(AerospikeConfig aerospikeConfig) {
        AerospikeClient newClient;
        ClientProperties clientProperties = new ClientProperties(aerospikeConfig);
        Some apply = Option$.MODULE$.apply(clientCache().get(clientProperties));
        if (apply instanceof Some) {
            ConcurrentLinkedDeque concurrentLinkedDeque = (ConcurrentLinkedDeque) apply.value();
            if (concurrentLinkedDeque.size() < aerospikeConfig.clientPoolSize()) {
                logInfo(() -> {
                    return new StringBuilder(65).append("client pool size have ").append(concurrentLinkedDeque.size()).append(", cachekey: ").append(clientProperties).append(" clients, will create a new one").toString();
                });
                newClient = newClient(aerospikeConfig);
            } else {
                AerospikeClient aerospikeClient = (AerospikeClient) concurrentLinkedDeque.removeFirst();
                if (aerospikeClient.isConnected()) {
                    logInfo(() -> {
                        return new StringBuilder(42).append("cachedClient: ").append(aerospikeClient).append(", partition-id: ").append(TaskContext$.MODULE$.getPartitionId()).append(", cachekey: ").append(clientProperties.toString()).toString();
                    });
                    concurrentLinkedDeque.addLast(aerospikeClient);
                    newClient = aerospikeClient;
                } else {
                    logWarning(() -> {
                        return new StringBuilder(61).append("found closed cached client ").append(aerospikeClient).append(" in pool with cachekey: ").append(clientProperties).append(", removed.").toString();
                    });
                    newClient = newClient(aerospikeConfig);
                }
            }
        } else {
            if (!None$.MODULE$.equals(apply)) {
                throw new MatchError(apply);
            }
            logInfo(() -> {
                return new StringBuilder(45).append("found no client in pool, machine: ").append(HelperFunctions$.MODULE$.getMachineName()).append(" cachekey: ").append(clientProperties.toString()).toString();
            });
            newClient = newClient(aerospikeConfig);
        }
        AerospikeClient aerospikeClient2 = newClient;
        clientCacheLedger().putIfAbsent(clientProperties, new AtomicInteger());
        clientCacheLedger().get(clientProperties).incrementAndGet();
        if (aerospikeConfig.useBooleanBin() && ServerUtil$.MODULE$.isFeatureSupported(aerospikeClient2, ServerUtil$.MODULE$.BooleanSupportVersion())) {
            Value.UseBoolBin = true;
        }
        return aerospikeClient2;
    }

    private AerospikeClient newClient(AerospikeConfig aerospikeConfig) {
        boolean z;
        boolean z2;
        Log.Level level;
        LogManager.getLogger("com.aerospike.spark").setLevel(aerospikeConfig.getLogLevel());
        Log.setCallback(new AerospikeJavaClientLogger());
        Level logLevel = aerospikeConfig.getLogLevel();
        Level level2 = Level.DEBUG;
        if (level2 != null ? !level2.equals(logLevel) : logLevel != null) {
            Level level3 = Level.TRACE;
            if (level3 != null ? !level3.equals(logLevel) : logLevel != null) {
                Level level4 = Level.ALL;
                z = level4 != null ? level4.equals(logLevel) : logLevel == null;
            } else {
                z = true;
            }
        } else {
            z = true;
        }
        if (z) {
            level = Log.Level.DEBUG;
        } else {
            Level level5 = Level.ERROR;
            if (level5 != null ? !level5.equals(logLevel) : logLevel != null) {
                Level level6 = Level.OFF;
                z2 = level6 != null ? level6.equals(logLevel) : logLevel == null;
            } else {
                z2 = true;
            }
            if (z2) {
                level = Log.Level.ERROR;
            } else {
                Level level7 = Level.WARN;
                level = (level7 != null ? !level7.equals(logLevel) : logLevel != null) ? Log.Level.INFO : Log.Level.WARN;
            }
        }
        Log.setLevel(level);
        String trim = aerospikeConfig.getIfNotEmpty(AerospikeConfig$.MODULE$.SeedHost(), "127.0.0.1:3000").toString().trim();
        String trim2 = aerospikeConfig.getIfNotEmpty(AerospikeTLSConstants$.MODULE$.TlsName(), "").toString().trim();
        String str = (trim2 == null || !StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(trim2))) ? null : trim2;
        ClientPolicy buildClientPolicy = ClientPolicyBuilder$.MODULE$.apply(aerospikeConfig, EventLoopProvider$.MODULE$.getEventLoops()).buildClientPolicy();
        buildClientPolicy.scanPolicyDefault = ScanPolicyBuilder$.MODULE$.apply(aerospikeConfig).getScanPolicy();
        buildClientPolicy.tlsPolicy = TlsPolicyBuilder$.MODULE$.apply(aerospikeConfig).getTlsPolicy();
        buildClientPolicy.writePolicyDefault = WritePolicyBuilder$.MODULE$.apply(aerospikeConfig).getWritePolicy();
        AerospikeClient aerospikeClient = new AerospikeClient(buildClientPolicy, (Host[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(trim.split(",")), str2 -> {
            String[] split = str2.split(":");
            String str2 = split[0];
            int int$extension = split.length == 2 ? StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(split[1])) : 3000;
            Predef$.MODULE$.require(int$extension >= 1 && StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(str2)), () -> {
                return new StringBuilder(26).append("host:").append(str2).append(" port:").append(int$extension).append(" is not correct").toString();
            });
            return new Host(str2, str, int$extension);
        }, ClassTag$.MODULE$.apply(Host.class)));
        Predef$.MODULE$.require(aerospikeClient != null && aerospikeClient.isConnected(), () -> {
            return "either client is null or not connected";
        });
        ClientProperties clientProperties = new ClientProperties(aerospikeConfig);
        clientCache().putIfAbsent(clientProperties, new ConcurrentLinkedDeque<>());
        ConcurrentLinkedDeque<AerospikeClient> concurrentLinkedDeque = clientCache().get(clientProperties);
        logInfo(() -> {
            return new StringBuilder(40).append("for cachekey ").append(clientProperties).append(", retrieved set have ").append(concurrentLinkedDeque.size()).append(" items").toString();
        });
        concurrentLinkedDeque.addLast(aerospikeClient);
        clientCache().put(clientProperties, concurrentLinkedDeque);
        logInfo(() -> {
            return new StringBuilder(62).append("cachekey: ").append(clientProperties).append(", new client instance: ").append(aerospikeClient).append(", partition-id: ").append(TaskContext$.MODULE$.getPartitionId()).append(",  machine: ").append(HelperFunctions$.MODULE$.getMachineName()).append(" ").toString();
        });
        verifyFeatureFile(aerospikeClient);
        return aerospikeClient;
    }

    private void verifyFeatureFile(AerospikeClient aerospikeClient) {
        Predef$.MODULE$.require(aerospikeClient.isConnected(), () -> {
            return new StringBuilder(34).append("client instance ").append(aerospikeClient).append(" is not connected!").toString();
        });
        ServerUtil$.MODULE$.validateFeatureKeyWithServer(aerospikeClient);
    }

    public void closeAll() {
        clientCache().values().forEach(concurrentLinkedDeque -> {
            concurrentLinkedDeque.forEach(aerospikeClient -> {
                aerospikeClient.close();
            });
        });
        clientCache().clear();
        clientCacheLedger().clear();
    }

    private AerospikeConnection$() {
    }
}
