Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharded pubsub subscriber cannot get messages from all subscribed channels #3133

Open
thx123 opened this issue Sep 25, 2024 · 0 comments
Open

Comments

@thx123
Copy link

thx123 commented Sep 25, 2024

Issue tracker is used for reporting bugs and discussing new features. Please use
stackoverflow for supporting issues.

Expected Behavior

Sharded pubsub subscriber should get messages from all channels that it's subscribed to.

Current Behavior

Sharded pubsub subscriber can only get messages from the shard that the first subscribed channel belongs.

Possible Solution

Have sharded pubsub subscriber listen to all shards to which its subscribed channels belong.

Steps to Reproduce

  1. Start Redis Server with cluster node on.
  2. Run the go script given below.
  3. Observe that the ssubscriber can only receive messages from a subset of channels that it has subscribed to.

Context (Environment)

This happens across different environments as long as a Redis cluster is used.

Detailed Description

Code to reproduce this bug:

package main

import (
    "context"
    "flag"
    "fmt"
    "os"

    "github.com/google/logger"
    "github.com/redis/go-redis/v9"
)

const logPath = "./ssub.log"

var verbose = flag.Bool("verbose", true, "print info level logs to stdout")

func main() {
    lf, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0660)
    if err != nil {
        logger.Fatalf("Failed to open log file: %v", err)
    }
    defer lf.Close()
    defer logger.Init("SsubLogger", *verbose, true, lf).Close()
    logger.Info("")

    // Create a new RedisCluster client
    rdb := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs: []string{
            "localhost:6379",
        },
    })

    // Ping the Redis Cluster to ensure connectivity
    if err := rdb.Ping(context.Background()).Err(); err != nil {
        logger.Infof("Could not connect to Redis Cluster: %v\n", err)
    }
    logger.Info("RedisCluster client is ready")

    pubsub := rdb.SSubscribe(context.Background())
    defer pubsub.Close()

    // Ssubscribe to ch 1 - 9
    for i := 1; i <= 9; i++ {
        ch := fmt.Sprintf("ch%d", i)

        pubsub.SSubscribe(context.Background(), ch)
        logger.Info("Subscribed to ", ch)

        rdb.SPublish(context.Background(), ch, "Hello")
    }

    chss, _ := rdb.PubSubShardChannels(context.Background(), "ch*").Result()
    logger.Info("Currently subscribed channels: ", chss)

    for {
        // logger.Info("Waiting for sharded Redis pub-sub messages")
        // ReceiveMessage will crash when SSubscribe is used.
        // msg, err := pubsub.ReceiveMessage(context.Background())
        // if err != nil {
        //  logger.Info(err)
        // }
        msg := <-pubsub.Channel()
        logger.Infof("Received %s %s", msg.Channel, msg.Payload)
    }
}

Output:

INFO : 2024/09/25 15:35:14.774884 ssub.go:24:
INFO : 2024/09/25 15:35:14.781693 ssub.go:37: RedisCluster client is ready
INFO : 2024/09/25 15:35:14.782690 ssub.go:47: Subscribed to ch1
INFO : 2024/09/25 15:35:14.783934 ssub.go:47: Subscribed to ch2
INFO : 2024/09/25 15:35:14.784396 ssub.go:47: Subscribed to ch3
INFO : 2024/09/25 15:35:14.784838 ssub.go:47: Subscribed to ch4
INFO : 2024/09/25 15:35:14.785478 ssub.go:47: Subscribed to ch5
INFO : 2024/09/25 15:35:14.785714 ssub.go:47: Subscribed to ch6
INFO : 2024/09/25 15:35:14.785889 ssub.go:47: Subscribed to ch7
INFO : 2024/09/25 15:35:14.786079 ssub.go:47: Subscribed to ch8
INFO : 2024/09/25 15:35:14.786246 ssub.go:47: Subscribed to ch9
INFO : 2024/09/25 15:35:14.786489 ssub.go:53: Currently subscribed channels: []
INFO : 2024/09/25 15:35:14.786744 ssub.go:63: Received ch1 Hello
INFO : 2024/09/25 15:35:14.786808 ssub.go:63: Received ch2 Hello
INFO : 2024/09/25 15:35:14.887868 ssub.go:63: Received ch5 Hello
INFO : 2024/09/25 15:35:14.888170 ssub.go:63: Received ch6 Hello
INFO : 2024/09/25 15:35:14.989075 ssub.go:63: Received ch9 Hello

Possible Implementation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant