diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlaveFactory.java b/src/main/java/redis/clients/jedis/JedisSentinelSlaveFactory.java new file mode 100644 index 0000000000..d980f98c8c --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlaveFactory.java @@ -0,0 +1,137 @@ +package redis.clients.jedis; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.PooledObjectFactory; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.exceptions.JedisException; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSocketFactory; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReference; + +public class JedisSentinelSlaveFactory implements PooledObjectFactory { + private static final Logger logger = LoggerFactory.getLogger(JedisSentinelSlaveFactory.class); + private final String masterName; + private final int retryTime = 5; + + private final AtomicReference> slavesHostAndPort = new AtomicReference<>(); + private final int connectionTimeout; + private final int soTimeout; + private final String password; + private final int database; + private final String clientName; + private final boolean ssl; + private final SSLSocketFactory sslSocketFactory; + private SSLParameters sslParameters; + private HostnameVerifier hostnameVerifier; + + public JedisSentinelSlaveFactory(final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier, String masterName) { + this.connectionTimeout = connectionTimeout; + this.soTimeout = soTimeout; + this.password = password; + this.database = database; + this.clientName = clientName; + this.ssl = ssl; + this.sslSocketFactory = sslSocketFactory; + this.sslParameters = sslParameters; + this.hostnameVerifier = hostnameVerifier; + this.masterName = masterName; + } + + public void setSlavesHostAndPort(final List slaveHostAndPort) { + if (slaveHostAndPort == null || slaveHostAndPort.size() == 0) { + return; + } + this.slavesHostAndPort.set(slaveHostAndPort); + } + + @Override + public void activateObject(PooledObject pooledJedis) throws Exception { + final Jedis jedis = pooledJedis.getObject(); + if (jedis.getDB() != database) { + jedis.select(database); + } + + } + + @Override + public void destroyObject(PooledObject pooledJedis) throws Exception { + final Jedis jedis = pooledJedis.getObject(); + if (jedis.isConnected()) { + try { + try { + jedis.close(); + } catch (Exception e) { + logger.debug("Error while close", e); + } + jedis.disconnect(); + } catch (Exception e) { + logger.debug("Error while disconnect", e); + } + } + + } + + @Override + public PooledObject makeObject() throws Exception { + final List slaves = slavesHostAndPort.get(); + if (slaves == null || slaves.isEmpty()) { + throw new JedisException(String.format("No valid slave for master: %s,slave:%s", this.masterName, this.slavesHostAndPort)); + } + DefaultPooledObject result = tryToGetSlave(slaves); + if (null != result) { + return result; + } else { + throw new JedisException(String.format("No valid slave for master: %s, after try %d times.", this.masterName, retryTime)); + } + } + + private DefaultPooledObject tryToGetSlave(List slaves) { + int retry = retryTime; + while (retry >= 0) { + retry--; + int randomIndex = ThreadLocalRandom.current().nextInt(slaves.size()); + String host = slaves.get(randomIndex).getHost(); + int port = slaves.get(randomIndex).getPort(); + final Jedis jedisSlave = new Jedis(host, port, connectionTimeout, soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier); + try { + jedisSlave.connect(); + if (null != this.password) { + jedisSlave.auth(this.password); + } + if (database != 0) { + jedisSlave.select(database); + } + if (clientName != null) { + jedisSlave.clientSetname(clientName); + } + return new DefaultPooledObject<>(jedisSlave); + + } catch (Exception e) { + jedisSlave.close(); + logger.error("tryToGetSlave error ", e); + } + } + return null; + } + + @Override + public void passivateObject(PooledObject pooledJedis) throws Exception { + // TODO maybe should select db 0? Not sure right now. + } + + @Override + public boolean validateObject(PooledObject pooledJedis) { + final Jedis jedis = pooledJedis.getObject(); + try { + return jedis.isConnected() && jedis.ping().equals("PONG"); + } catch (final Exception e) { + return false; + } + } +} \ No newline at end of file diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java new file mode 100644 index 0000000000..f66f00bd1f --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -0,0 +1,298 @@ +package redis.clients.jedis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.util.Pool; + +import java.security.InvalidParameterException; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +public class JedisSentinelSlavePool extends Pool { + + protected Logger logger = LoggerFactory.getLogger(JedisSentinelSlavePool.class.getName()); + + protected GenericObjectPoolConfig poolConfig; + protected final Collection masterListeners = new ArrayList<>(); + private final JedisSentinelSlaveFactory factory; + private volatile HostAndPort currentHostMaster; + private volatile List currentSlaves; + private final Object initPoolLock = new Object(); + private final JedisClientConfig sentinelClientConfig; + private final Set sentinels; + + public JedisSentinelSlavePool(String masterName, Set sentinels, final GenericObjectPoolConfig poolConfig, final int timeout, final int soTimeout, final String password, final int database, final String sentinelPassword) { + this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null, DefaultJedisClientConfig.builder().connectionTimeoutMillis(timeout).socketTimeoutMillis(timeout).password(sentinelPassword).build(), new JedisSentinelSlaveFactory(timeout, soTimeout, password, database, null, false, null, null, null, masterName)); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final JedisClientConfig sentinelClientConfig, JedisSentinelSlaveFactory factory) { + + super(poolConfig, factory); + this.sentinelClientConfig = sentinelClientConfig; + this.poolConfig = poolConfig; + this.factory = factory; + this.sentinels = parseHostAndPorts(sentinels); + HostAndPort master = initsentinels(this.sentinels, masterName); + initPool(master); + } + + private static Set parseHostAndPorts(Set strings) { + return strings.stream().map(HostAndPort::from).collect(Collectors.toSet()); + } + + private HostAndPort toHostAndPort(List getMasterAddrByNameResult) { + String host = getMasterAddrByNameResult.get(0); + int port = Integer.parseInt(getMasterAddrByNameResult.get(1)); + + return new HostAndPort(host, port); + } + + @Override + public void destroy() { + for (MasterListener m : masterListeners) { + m.shutdown(); + } + super.destroy(); + } + + @Override + public Jedis getResource() { + while (true) { + Jedis jedis = super.getResource(); + jedis.setDataSource(this); + + final List slaves = currentSlaves; + final HostAndPort connection = jedis.getConnection().getHostAndPort(); + + if (slaves.isEmpty()) { + logger.debug("slave node is empty! host:{},port:{}", connection.getHost(), connection.getPort()); + returnBrokenResource(jedis); + } + if (slaves.contains(connection)) { + return jedis; + } else { + returnBrokenResource(jedis); + } + } + } + + private void initPool(HostAndPort master) { + synchronized (initPoolLock) { + if (!master.equals(currentHostMaster)) { + currentHostMaster = master; + // update newest slaves + factory.setSlavesHostAndPort(currentSlaves); + // although we clear the pool, we still have to check the returned object in getResource, + // this call only clears idle instances, not borrowed instances + super.clear(); + + logger.info("Created JedisSentinelPool to master at {}", master); + } + } + } + + public void setCurrentSlaves(List currentSlaves) { + if (currentSlaves == null || currentSlaves.size() == 0) { + return; + } + this.currentSlaves = currentSlaves; + } + + public List getCurrentSlaves() { + return currentSlaves; + } + + private HostAndPort initsentinels(Set sentinels, final String masterName) { + + HostAndPort master = null; + boolean sentinelAvailable = false; + + logger.info("Trying to find a valid sentinel from available Sentinels " + masterName); + + for (HostAndPort hap : sentinels) { + + logger.info("Connecting to Sentinel " + hap + ",masterName = " + masterName); + + try (Jedis jedis = new Jedis(hap, sentinelClientConfig)) { + sentinelAvailable = true; + + List masterAddr = jedis.sentinelGetMasterAddrByName(masterName); + if (masterAddr == null || masterAddr.size() != 2) { + logger.warn("Can not get master addr from sentinel, master name: " + masterName + ". Sentinel: " + hap + "."); + continue; + } + + master = toHostAndPort(masterAddr); + + //init currentSlaves + List> slaves = jedis.sentinelReplicas(masterName); + logger.info(masterName + " sentinelSlaves:" + slaves); + if (slaves == null || slaves.size() == 0) { + continue; + } + // filter status is down + this.setCurrentSlaves(slaves.stream().filter(this::checkNodeStatus) + .map(slave -> HostAndPort.from(slave.get("ip") + ":" + slave.get("port"))) + .collect(Collectors.toList())); + logger.info(masterName + " set currentSlaves " + currentSlaves); + + logger.debug("Found Redis master at {}", master); + + break; + } catch (JedisException e) { + logger.warn("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e + ". Trying next one."); + } + } + + if (master == null) { + if (sentinelAvailable) { + // can connect to sentinel, but master name seems to not monitored + throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); + } else { + throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running..."); + } + } + + logger.info("Redis master running at {}, starting Sentinel listeners...", master); + if (masterListeners.size() == 0) { + for (HostAndPort hap : sentinels) { + MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort()); + // whether MasterListener threads are alive or not, process can be stopped + masterListener.setDaemon(true); + masterListeners.add(masterListener); + masterListener.start(); + } + } + + return master; + } + + private boolean checkNodeStatus(Map slave) { + return !slave.get("flags").contains("s_down") && "ok".equals(slave.get("master-link-status")); + } + + @Override + public void returnResource(final Jedis resource) { + if (resource != null) { + try { + resource.resetState(); + super.returnResource(resource); + } catch (RuntimeException e) { + returnBrokenResource(resource); + logger.debug("Resource is returned to the pool as broken", e); + } + + } + } + + protected class MasterListener extends Thread { + + protected String masterName; + protected String host; + protected int port; + protected long subscribeRetryWaitTimeMillis = 5000; + protected volatile Jedis j; + protected AtomicBoolean running = new AtomicBoolean(false); + + protected MasterListener() { + } + + public MasterListener(String masterName, String host, int port) { + super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port)); + this.masterName = masterName; + this.host = host; + this.port = port; + } + + public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) { + this(masterName, host, port); + this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; + } + + @Override + public void run() { + + running.set(true); + + while (running.get()) { + final HostAndPort hostPort = new HostAndPort(host, port); + j = new Jedis(hostPort, sentinelClientConfig); + + try { + // double check that it is not being shutdown + if (!running.get()) { + break; + } + + j.subscribe(new SentinelSlaveChangePubSub(), "+switch-master", "+slave", "+sdown", "+odown", "+reboot"); + + } catch (JedisConnectionException e) { + + if (running.get()) { + logger.error("Lost connection to Sentinel at " + host + ":" + port + ". Sleeping 5000ms and retrying.", e); + try { + Thread.sleep(subscribeRetryWaitTimeMillis); + } catch (InterruptedException e1) { + logger.info("Sleep interrupted: ", e1); + } + } else { + logger.info("Unsubscribing from Sentinel at " + host + ":" + port); + } + } finally { + j.close(); + } + } + } + + public void shutdown() { + try { + logger.info("Shutting down listener on " + host + ":" + port); + running.set(false); + // This isn't good, the Jedis object is not thread safe + if (j != null) { + j.disconnect(); + } + } catch (Exception e) { + logger.error("Caught exception while shutting down: ", e); + } + } + + private class SentinelSlaveChangePubSub extends JedisPubSub { + @Override + public void onMessage(String channel, String message) { + if (masterName == null) { + logger.error("Master Name is null!"); + throw new InvalidParameterException("Master Name is null!"); + } + logger.info("Get message on chanel[" + channel + "], published [" + message + "]" + ". current sentinel " + host + ":" + port); + + String[] msg = message.split(" "); + List msgList = Arrays.asList(msg); + if (msgList.isEmpty()) { + return; + } + boolean needResetPool = masterName.equalsIgnoreCase(msgList.get(0)); + int tmpIndex = msgList.indexOf("@") + 1; + if (tmpIndex > 0 && masterName.equalsIgnoreCase(msgList.get(tmpIndex))) { //message from other channels + needResetPool = true; + } + if (needResetPool) { + // sleep 1s Ensure sentinel status is updated + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.error("initSentinels error"); + } + HostAndPort master = initsentinels(sentinels, masterName); + initPool(master); + } else { + logger.info("message is not for master " + masterName); + } + } + } + } +} \ No newline at end of file diff --git a/src/test/java/redis/clients/jedis/JedisSentineSlavePoolTest.java b/src/test/java/redis/clients/jedis/JedisSentineSlavePoolTest.java new file mode 100644 index 0000000000..b60a3bb2d9 --- /dev/null +++ b/src/test/java/redis/clients/jedis/JedisSentineSlavePoolTest.java @@ -0,0 +1,86 @@ +package redis.clients.jedis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** + * Created by PENG on 2024/4/3 17:34 + * + * @author admin + */ +public class JedisSentineSlavePoolTest { + private static final String MASTER_NAME = "mymaster"; + + protected static final HostAndPort sentinel0 = HostAndPorts.getSentinelServers().get(0); + protected static final HostAndPort sentinel1 = HostAndPorts.getSentinelServers().get(1); + protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(2); + + protected final Set sentinels = new HashSet<>(); + + @Before + public void setUp() throws Exception { + sentinels.clear(); + + sentinels.add(sentinel0.toString()); + sentinels.add(sentinel1.toString()); + sentinels.add(sentinel2.toString()); + } + + @Test + public void repeatedSentinelPoolInitialization() { + + for (int i = 0; i < 20; ++i) { + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + JedisSentinelSlavePool slavePool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, + 1000, 1000, "hellojedis", 0, null); + + slavePool.getResource().close(); + slavePool.destroy(); + } + } + + @Test + public void checkResourceIsCloseable() { + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + JedisSentinelSlavePool pool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, 1000, + 1000, "hellojedis", 0, null); + + Jedis jedis = pool.getResource(); + try { +// redis.clients.jedis.exceptions.JedisDataException: READONLY You can't write against a read only replica. +// jedis.set("hello", "jedis"); + } finally { + jedis.close(); + } + + Jedis jedis2 = pool.getResource(); + try { + assertEquals(jedis, jedis2); + } finally { + jedis2.close(); + } + } + + @Test + public void checkReadFromSlave() { + + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + JedisSentinelSlavePool slavePool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, 1000, + 1000, "hellojedis", 0, null); + for (int i = 0; i < 10; i++) { + Jedis jedis = slavePool.getResource(); + jedis.get("key"); //Random read from slave + } + slavePool.getResource().close(); + slavePool.destroy(); + } +}