⚡ Improve your application's performance by consuming your Symfony Messenger messages with Go.
- Consume your messages directly with Go code
- PostgreSQL support
- AMQP support
- Redis support
Install gosumer with Go
go get github.com/romaixn/gosumer
Add this to your config/packages/messenger.yaml
:
framework:
messenger:
transports:
go: # Add this new transport
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: 'messenger.transport.symfony_serializer' # Required, https://symfony.com/doc/current/messenger.html#serializing-messages
options:
use_notify: true
check_delayed_interval: 60000
queue_name: go # Required, used to only get right messages in go side
retry_strategy:
max_retries: 3
multiplier: 2
Don't forget to specify in the routing
part the message to process in Go
Create an env variable to create a custom queue (in this example go
is the name of the queue):
RABBITMQ_GO_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/go
And use it in config/packages/messenger.yaml
:
framework:
messenger:
transports:
go:
dsn: '%env(RABBITMQ_GO_TRANSPORT_DSN)%'
serializer: 'messenger.transport.symfony_serializer'
retry_strategy:
max_retries: 3
multiplier: 2
Create an env variable for Redis:
REDIS_TRANSPORT_DSN=redis://localhost:6379/messages
Add the following to your config/packages/messenger.yaml
:
framework:
messenger:
transports:
async:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options: []
Make sure to specify the message routing in the routing
section to process in Go.
For PostgreSQL:
database := gosumer.PgDatabase{
Host: "localhost",
Port: 5432,
User: "app",
Password: "!ChangeMe!",
Database: "app",
TableName: "messenger_messages",
}
If you are using a custom schema, you can specify it with backticks:
database := gosumer.PgDatabase{
Host: "localhost",
Port: 5432,
User: "app",
Password: "!ChangeMe!",
Database: "app",
TableName: `"myschema"."messenger_messages"`,
}
For RabbitMQ:
database := gosumer.RabbitMQ{
Host: "localhost",
Port: nil,
User: "guest",
Password: "guest",
Queue: "go",
}
For Redis:
database := gosumer.Redis{
Host: "localhost",
Port: 6379,
User: "username",
Password: "password",
DB: 0,
Channel: "channel_name",
}
Call the Listen
// Define your own structure according to your message
type Message struct {
ID int `json:"id"`
Number int `json:"number"`
}
err := gosumer.Listen(database, process, Message{})
if err != nil {
log.Fatal(err)
}
With the function to process your messages:
func process(message any, err chan error) {
log.Printf("Message received: %v", message)
// No error
err <- nil
// if there is an error, used to not delete message if an error occured
// err <- errors.New("Error occured !")
}