Skip to content

Commit

Permalink
finagle/mux: Add a stat for the received request context size
Browse files Browse the repository at this point in the history
Problem

We want to be able to see the sizes of received contexts.

Solution

Add a stat, <server_label>/mux/request_context_bytes, that
records the size of the context for each received request.

Differential Revision: https://phabricator.twitter.biz/D1179098
  • Loading branch information
jcrossley authored and jenkins committed Oct 25, 2024
1 parent eb41122 commit d173acb
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ New Features
~~~~~~~~~~

* finagle-mysql: Added support for LONG_BLOB data type. ``PHAB_ID=D1152247``
* finagle-mux: Added a stat, <server_label>/mux/request_context_bytes, that shows the size of the received
context per request.


Bug Fixes
Expand All @@ -39,6 +41,7 @@ Breaking API Changes
`TrackWorkerPoolExcutionDelay`, `c.t.f.netty4.param.TrackWorkerPoolExecutionDelay` has been renamed
to `TrackWorkerPool`. These changes reflect the tracker's new functionality of collecting metrics
and data other than the execution delay (see Runtime Behaviour Changes). ``PHAB_ID=D1176906``


24.5.0
------
Expand Down
3 changes: 3 additions & 0 deletions doc/src/sphinx/metrics/Mux.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
not have a corresponding request. This happens when a server has already
responded to the request when it receives a Tdiscard.

**<server_label>/mux/request_context_bytes**
A stat of the size of the received context per request.

**clienthangup**
A counter of the number of times sessions have been abruptly terminated by
the client.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package com.twitter.finagle.mux

import com.twitter.finagle.{Dtab, FailureFlags, Filter, Path, Service}
import com.twitter.finagle.Dtab
import com.twitter.finagle.FailureFlags
import com.twitter.finagle.Filter
import com.twitter.finagle.Path
import com.twitter.finagle.Service
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.mux.transport.{Message, MuxFailure}
import com.twitter.finagle.mux.transport.Message
import com.twitter.finagle.mux.transport.MuxFailure
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing.Trace
import com.twitter.util.{Future, Return, Throw}
import com.twitter.io.Buf
import com.twitter.util.Future
import com.twitter.util.Return
import com.twitter.util.Throw

/**
* Processor handles request, dispatch, and ping messages. Request
Expand All @@ -14,16 +23,18 @@ import com.twitter.util.{Future, Return, Throw}
* (This arrangement permits interpositioning other filters to modify ping
* or dispatch behavior, e.g., for testing.)
*/
private[finagle] object ServerProcessor extends Filter[Message, Message, Request, Response] {
private[finagle] class ServerProcessor(statsReceiver: StatsReceiver)
extends Filter[Message, Message, Request, Response] {
import Message._

private[this] val AlwaysEmpty = { _: Throwable => MuxFailure.Empty }
private[this] val contextBytesStat = statsReceiver.stat("request_context_bytes")

private[this] def dispatch(
tdispatch: Message.Tdispatch,
service: Service[Request, Response]
): Future[Message] = {

contextBytesStat.add(contextBytes(tdispatch.contexts))
Contexts.broadcast.letUnmarshal(tdispatch.contexts) {
if (tdispatch.dtab.nonEmpty)
Dtab.local ++= tdispatch.dtab
Expand Down Expand Up @@ -77,4 +88,14 @@ private[finagle] object ServerProcessor extends Filter[Message, Message, Request
case Message.Tping(tag) => Future.value(Message.Rping(tag))
case m => Future.exception(new IllegalArgumentException(s"Cannot process message $m"))
}

private[this] def contextBytes(contexts: Seq[(Buf, Buf)]): Int = {
var bytes = 0
contexts.foreach {
case (key, value) =>
bytes += key.length
bytes += value.length
}
bytes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private class ServerTracker(
private[this] var h_leaseExpiration: Time = Time.Top
private[this] var h_cachedLocals: Option[Local.Context] = None

private[this] val serverProcessor = new ServerProcessor(statsReceiver)

/** `Lessee` used for issuing lease duration. */
val lessee: Lessee = new Lessee {
def issue(howLong: Duration): Unit = {
Expand Down Expand Up @@ -175,7 +177,7 @@ private class ServerTracker(
else
Local.let(handleGetLocals()) {
lessor.observeArrival()
val responseF = ServerProcessor(m, service)
val responseF = serverProcessor(m, service)
val elapsed = Stopwatch.start()
val dispatch = Dispatch(m.tag, responseF, elapsed)
h_dispatches.put(m.tag, dispatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.twitter.finagle.mux.pushsession.FragmentingMessageWriter
import com.twitter.finagle.mux.pushsession.MuxClientSession
import com.twitter.finagle.mux.pushsession.MuxServerSession
import com.twitter.finagle.mux.transport.Message
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.tracing._
import com.twitter.io.Buf
Expand Down Expand Up @@ -65,6 +66,8 @@ private[mux] abstract class ClientServerTest
val pingSends = new AtomicInteger(0)
val pingReceives = new AtomicInteger(0)

val serverStatsReceiver = new InMemoryStatsReceiver

{ // launch the read loops for each queue
def loop(source: AsyncQueue[Buf], dest: QueueChannelHandle[ByteReader, _]): Unit = {
source.poll().respond {
Expand Down Expand Up @@ -122,7 +125,7 @@ private[mux] abstract class ClientServerTest

val server: Closable = {
val session = new MuxServerSession(
params = Mux.server.params,
params = Mux.server.withStatsReceiver(serverStatsReceiver).params,
h_decoder = new FragmentDecoder(NullStatsReceiver),
h_messageWriter =
new FragmentingMessageWriter(serverHandle, Int.MaxValue, NullStatsReceiver),
Expand Down Expand Up @@ -415,4 +418,24 @@ class ClientServerTestDispatch extends ClientServerTest {
assert(response.contexts.nonEmpty)
assert(response.contexts == ctxts)
}

test("Record stat for received request context size")(new Ctx {
val request = Request.empty

when(service(request)).thenAnswer(
new Answer[Future[Response]] {
def answer(invocation: InvocationOnMock) = {
Future.value(Response(request.contexts, Buf.Empty))
}
}
)

await(Contexts.broadcast.let(testContext, Buf.Utf8("My context!")) {
client(Request.empty)
})

assert(
serverStatsReceiver.stats(Seq("request_context_bytes")) == Seq(
"com.twitter.finagle.mux.MuxContext".length + "My context!".length))
})
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
package com.twitter.finagle.thriftmux.pushsession

import com.twitter.finagle.context.{Contexts, RemoteInfo}
import com.twitter.finagle.{Dtab, Path, Service, Stack, Status, Thrift, mux, param}
import com.twitter.finagle.pushsession.{PushChannelHandle, PushSession}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.context.RemoteInfo
import com.twitter.finagle.Dtab
import com.twitter.finagle.Path
import com.twitter.finagle.Service
import com.twitter.finagle.Stack
import com.twitter.finagle.Status
import com.twitter.finagle.Thrift
import com.twitter.finagle.mux
import com.twitter.finagle.param
import com.twitter.finagle.pushsession.PushChannelHandle
import com.twitter.finagle.pushsession.PushSession
import com.twitter.finagle.mux.transport.Message
import com.twitter.finagle.mux.{ClientDiscardedRequestException, ServerProcessor}
import com.twitter.finagle.mux.ClientDiscardedRequestException
import com.twitter.finagle.mux.ServerProcessor
import com.twitter.finagle.stats.Verbosity
import com.twitter.finagle.thrift.thrift.RequestHeader
import com.twitter.finagle.thrift.{ClientId, InputBuffer, RichRequestHeader}
import com.twitter.finagle.thrift.ClientId
import com.twitter.finagle.thrift.InputBuffer
import com.twitter.finagle.thrift.RichRequestHeader
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.transport.Transport
import com.twitter.io.{Buf, ByteReader}
import com.twitter.logging.{Level, Logger}
import com.twitter.io.Buf
import com.twitter.io.ByteReader
import com.twitter.logging.Level
import com.twitter.logging.Logger
import com.twitter.util._
import java.util
import org.apache.thrift.protocol.TProtocolFactory
Expand Down Expand Up @@ -53,6 +67,8 @@ private final class VanillaThriftSession(
private[this] val pendingGauge = params[param.Stats].statsReceiver
.addGauge(Verbosity.Debug, "pending") { pending.size }

private[this] val serverProcessor = new ServerProcessor(params[param.Stats].statsReceiver)

private[this] val respond: Try[Message] => Unit = {
// Since it's stateless we can reuse the Runnable!
val runnable = new Runnable { def run = handleResponseComplete() }
Expand Down Expand Up @@ -89,7 +105,7 @@ private final class VanillaThriftSession(
if (state != Closed) {
val f = Local.let(locals) {
val dispatch = thriftToMux(isTTwitter, tProtocolFactory, reader.readAll())
ServerProcessor(dispatch, service)
serverProcessor(dispatch, service)
}

pending.offer(f)
Expand Down

0 comments on commit d173acb

Please sign in to comment.