-
Notifications
You must be signed in to change notification settings - Fork 332
How To: Do Log Processing
Let's build a log processing framework which will take JSON structured logs, and count the number and types of errors that occur in real-time.
Set up a Gemfile
source 'https://rubygems.org'
gem 'sneakers'
gem 'json'
gem 'redis'
And a first iteration of a worker
# boot.rb
require 'sneakers'
class Processor
include Sneakers::Worker
from_queue :logs
def work(msg)
logger.info msg
end
end
Let's test it out quickly from the command line:
sneakers work Processor --require boot.rb
We just told Sneakers to spawn a worker named Processor
, but first --require
a file that we dedicate to setting up environment, including workers and what-not.
For simplicity, we also told Sneakers to not daemonize because we did
not use --daemonize
. In general I don't tend to daemonize processes unless I must, it tends to make things fail less obviously. Instead, I run processes at the front and I let the operating system daemonize them for me with Upstart, init.d, runit, systemd, or what have you.
I also like to run workers in their own shell and project, see How To: Running a stand-alone worker when you're ready. Not to make a spoiler of it, but it presents a clean and modular worker project and how to run and maintain a worker as a first-class citizen.
Back to business. If you go to your RabbitMQ admin now, you'll see a new queue named logs
was created. Push a couple messages, and this is the output you should see at your terminal.
2013-10-11T19:26:36Z p-4718 t-ovqgyb31o DEBUG: [worker-logs:1:213mmy][#<Thread:0x007fae6b05cc58>][logs][{:prefetch=>10, :durable=>true, :ack=>true, :heartbeat_interval=>2, :exchange=>"sneakers"}] Working off: log log
2013-10-11T19:26:36Z p-4718 t-ovqgyrxu4 INFO: log log
2013-10-11T19:26:40Z p-4719 t-ovqgyb364 DEBUG: [worker-logs:1:h23iit][#<Thread:0x007fae6b05cd98>][logs][{:prefetch=>10, :durable=>true, :ack=>true, :heartbeat_interval=>2, :exchange=>"sneakers"}] Working off: log log
2013-10-11T19:26:40Z p-4719 t-ovqgyrx8g INFO: log log
We're basically done with the ceremonies and all is left is to do some real work.
We'll count errors and error types with Redis. Specifically for an error that looks like this:
{
"type": "error",
"message": "HALP!",
"error": "CODE001"
}
Let's update the worker code:
require 'sneakers'
require 'redis'
require 'json'
$redis = Redis.new
class Processor
include Sneakers::Worker
from_queue :logs
def work(msg)
err = JSON.parse(msg)
if err["type"] == "error"
$redis.incr "processor:#{err["error"]}"
end
ack!
end
end
Since tracing
is enabled with a debug-level logger, you'll see when a worker pulls off a message from the queue, and what this message really was:
2013-10-11T19:38:49Z p-7545 t-oxeymqln8 DEBUG: [worker-logs:1:o49a6u][#<Thread:0x007febbc035c28>][logs][{:prefetch=>10, :durable=>true, :ack=>true, :heartbeat_interval=>2, :exchange=>"sneakers"}] Working off: {
"type": "error",
"message": "HALP!",
"error": "CODE001"
}
Soon you'll see this at your Redis end:
➜ ~ redis-cli monitor
1381520329.888581 [0 127.0.0.1:49182] "incr" "processor:CODE001"
Let's use the logging_metrics
provider just for the sake of fun of seeing the metrics as they happen.
# boot.rb
require 'sneakers'
require 'redis'
require 'json'
require 'sneakers/metrics/logging_metrics'
Sneakers.configure :metrics => Sneakers::Metrics::LoggingMetrics.new
# ... rest of code
Now push a message again and you'll see:
2013-10-11T19:44:37Z p-9219 t-oxh8owywg INFO: INC: work.Processor.started
2013-10-11T19:44:37Z p-9219 t-oxh8owywg INFO: TIME: work.Processor.time 0.00242
2013-10-11T19:44:37Z p-9219 t-oxh8owywg INFO: INC: work.Processor.handled.ack
Which increments start + end, and times the work unit.
This is it for now. You've seen how to interactively build and tweak a
worker. If you use the --daemonize
flag, you'll have a production-ready processor, spitting logs into sneakers.log
by default.
In production, you can also use the StatsdMetrics
module in order to ship metrics to a production Statsd server.