Skip to content

Commit

Permalink
Merge pull request #210 from soundcloud/message-filters
Browse files Browse the repository at this point in the history
Add message filters
  • Loading branch information
Kim Covaci authored Mar 30, 2021
2 parents 8b2bf4f + 3440e97 commit 3719984
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ final class TwinagleServicePrinter(
private[this] val Response = s"$finagleHttp.Response"

private[this] val EndpointMetadata = s"$twinagle.EndpointMetadata"
private[this] val MessageFilter = s"$twinagle.MessageFilter"
private[this] val ClientEndpointBuilder = s"$twinagle.ClientEndpointBuilder"
private[this] val ServerBuilder = s"$twinagle.ServerBuilder"
private[this] val ProtoService = s"$twinagle.ProtoService"
private[this] val AsProtoService = s"$twinagle.AsProtoService"
private[this] val ProtoRpc = s"$twinagle.ProtoRpc"
private[this] val ProtoRpcBuilder = s"$twinagle.ProtoRpcBuilder"

def generateServiceObject(m: ServiceDescriptor): String = {
val serviceName = getServiceName(m)
Expand All @@ -37,17 +38,22 @@ final class TwinagleServicePrinter(
|
| def server(service: $serviceName,
| extension: $EndpointMetadata => $Filter.TypeAgnostic = _ => $Filter.TypeAgnostic.Identity,
| prefix: String = "/twirp"
| ): $Service[$Request, $Response] =
| $ServerBuilder(extension).withPrefix(prefix).register(service).build
| prefix: String = "/twirp",
| messageFilter: $MessageFilter = $MessageFilter.Identity
| ): $Service[$Request, $Response] =
| $ServerBuilder(extension)
| .withPrefix(prefix)
| .withMessageFilter(messageFilter)
| .register(service)
| .build
|}
""".stripMargin
}

def protoRpc(md: MethodDescriptor): String = {
val meta = endpointMetadata(md)
val rpc = s"service.${decapitalizedName(md)}"
s" $ProtoRpc($meta, $rpc _)"
s" $ProtoRpcBuilder($meta, $rpc _)"
}

def endpointMetadata(md: MethodDescriptor): String = {
Expand Down Expand Up @@ -169,7 +175,7 @@ final class TwinagleServicePrinter(
}

private def decapitalize(str: String) =
if (str.length >= 1)
if (str.nonEmpty)
str.substring(0, 1).toLowerCase() + str.substring(1)
else str

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package proto.test

import com.twitter.finagle.{Filter, Service}
import com.twitter.util.{Await, Future}
import com.soundcloud.twinagle.ServerBuilder

import com.soundcloud.twinagle.{MessageFilter, ServerBuilder}
import org.specs2.mutable.Specification
import org.specs2.specification.Scope
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

import scala.collection.mutable.ListBuffer

class MultiServiceSpec extends Specification {

Expand All @@ -29,4 +33,32 @@ class MultiServiceSpec extends Specification {
Await.result(svc1Client.rpc1(Request())) ==== Response()
Await.result(svc2Client.rpc2(Request())) ==== Response()
}

"ServerBuilder with specified message filter" >> {
trait MessageFilterContext extends Scope {
val recorderRequests = ListBuffer[GeneratedMessage]()
val filter = new MessageFilter {
override def toFilter[
Req <: GeneratedMessage: GeneratedMessageCompanion,
Resp <: GeneratedMessage: GeneratedMessageCompanion
]: Filter[Req, Resp, Req, Resp] =
(request: Req, next: Service[Req, Resp]) => {
recorderRequests += request
next(request)
}
}

val httpService = ServerBuilder()
.register(svc1)
.withMessageFilter(filter)
.build

val svc1Client = new Service1ClientProtobuf(httpService)
}

"should apply filter for each request" in new MessageFilterContext {
Await.result(svc1Client.rpc1(Request())) ==== Response()
recorderRequests.toList ==== List(Request())
}
}
}
30 changes: 30 additions & 0 deletions runtime/src/main/scala/com/soundcloud/twinagle/MessageFilter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.soundcloud.twinagle

import com.twitter.finagle.Filter
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

trait MessageFilter {
def toFilter[
Req <: GeneratedMessage: GeneratedMessageCompanion,
Resp <: GeneratedMessage: GeneratedMessageCompanion
]: Filter[Req, Resp, Req, Resp]

def andThen(other: MessageFilter): MessageFilter = new MessageFilter {
override def toFilter[
Req <: GeneratedMessage: GeneratedMessageCompanion,
Resp <: GeneratedMessage: GeneratedMessageCompanion
]: Filter[Req, Resp, Req, Resp] = this.toFilter[Req, Resp] andThen other.toFilter[Req, Resp]
}
}

object MessageFilter {
object Identity extends MessageFilter {
override def toFilter[
Req <: GeneratedMessage: GeneratedMessageCompanion,
Resp <: GeneratedMessage: GeneratedMessageCompanion
]: Filter[Req, Resp, Req, Resp] =
Filter.identity

override def andThen(other: MessageFilter): MessageFilter = other
}
}
29 changes: 26 additions & 3 deletions runtime/src/main/scala/com/soundcloud/twinagle/ProtoService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,43 @@ import com.twitter.finagle.http.{Request, Response}
import com.twitter.util.Future
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

case class ProtoService(rpcs: Seq[ProtoRpc]) {
case class ProtoService(rpcs: Seq[ProtoRpcBuilder]) {
assert(rpcs.map(_.metadata.service).toSet.size == 1, "inconsistent services in metadata")
}

object ProtoService {
implicit val asProtoService: AsProtoService[ProtoService] = (t: ProtoService) => t
}

case class ProtoRpc(metadata: EndpointMetadata, svc: Service[Request, Response])

object ProtoRpc {
def apply[
Req <: GeneratedMessage: GeneratedMessageCompanion,
Resp <: GeneratedMessage: GeneratedMessageCompanion
](endpointMetadata: EndpointMetadata, rpc: Req => Future[Resp]): ProtoRpc = {
val httpService = new TwirpEndpointFilter[Req, Resp] andThen Service.mk(rpc)
ProtoRpc(endpointMetadata, httpService)
ProtoRpcBuilder(endpointMetadata, rpc).build(MessageFilter.Identity)
}
}

trait ProtoRpcBuilder {
val metadata: EndpointMetadata

def build(messageFilter: MessageFilter): ProtoRpc
}

object ProtoRpcBuilder {
def apply[
Req <: GeneratedMessage: GeneratedMessageCompanion,
Resp <: GeneratedMessage: GeneratedMessageCompanion
](endpointMetadata: EndpointMetadata, rpc: Req => Future[Resp]): ProtoRpcBuilder = new ProtoRpcBuilder {
override val metadata: EndpointMetadata = endpointMetadata

override def build(messageFilter: MessageFilter): ProtoRpc = {
val svc = new TwirpEndpointFilter[Req, Resp] andThen
messageFilter.toFilter[Req, Resp] andThen
Service.mk(rpc)
ProtoRpc(endpointMetadata, svc)
}
}
}
30 changes: 21 additions & 9 deletions runtime/src/main/scala/com/soundcloud/twinagle/ServerBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import com.twitter.finagle.{Filter, Service}
*/
class ServerBuilder private (
extension: EndpointMetadata => Filter.TypeAgnostic,
endpoints: Seq[ProtoRpc],
prefix: String
endpoints: Seq[ProtoRpcBuilder],
prefix: String,
messageFilter: MessageFilter
) {

if (prefix.nonEmpty) {
Expand All @@ -26,38 +27,49 @@ class ServerBuilder private (
*/
def register[T: AsProtoService](svc: T): ServerBuilder = {
val protoService = implicitly[AsProtoService[T]].asProtoService(svc)
new ServerBuilder(extension, endpoints ++ protoService.rpcs, prefix)
new ServerBuilder(extension, endpoints ++ protoService.rpcs, prefix, messageFilter)
}

/** withPrefix configures the HTTP path prefix to use for this server (default: `/twirp`).
* Paths must be absolute and may not end with `/`.
* Use an empty string to expose endpoints at the root of the HTTP path.
*/
def withPrefix(prefix: String): ServerBuilder = {
new ServerBuilder(extension, endpoints, prefix)
new ServerBuilder(extension, endpoints, prefix, messageFilter)
}

/** withMessageFilter configures the message filter. Such filters can be used
* to observe and modify request and response payloads
* expressed as ScalaPB's GeneratedMessage.
*/
def withMessageFilter(filter: MessageFilter): ServerBuilder = {
new ServerBuilder(extension, endpoints, prefix, filter)
}

/** create an HTTP server that implements the Twirp wire protocol by
* dispatching to the registered services.
*/
def build: Service[Request, Response] =
new Server(endpoints.map(instrument), prefix)
new Server(endpoints.map(instrumentAndBuild), prefix)

private def instrument(rpc: ProtoRpc): ProtoRpc =
private def instrumentAndBuild(builder: ProtoRpcBuilder): ProtoRpc = {
val rpc = builder.build(messageFilter)
rpc.copy(
svc = extension(rpc.metadata).toFilter andThen
new TracingFilter[Request, Response](rpc.metadata) andThen
rpc.svc
)
}

}

object ServerBuilder {
def apply(
extension: EndpointMetadata => Filter.TypeAgnostic = _ => Filter.TypeAgnostic.Identity,
endpoints: Seq[ProtoRpc] = Seq.empty,
prefix: String = "/twirp"
): ServerBuilder = new ServerBuilder(extension, endpoints, prefix)
endpoints: Seq[ProtoRpcBuilder] = Seq.empty,
prefix: String = "/twirp",
messageFilter: MessageFilter = MessageFilter.Identity
): ServerBuilder = new ServerBuilder(extension, endpoints, prefix, messageFilter)
}

trait AsProtoService[T] {
Expand Down
34 changes: 32 additions & 2 deletions runtime/src/test/scala/com/soundcloud/twinagle/ServerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package com.soundcloud.twinagle

import com.soundcloud.twinagle.test.TestMessage
import com.twitter.finagle.{CancelledRequestException, Failure, Service}
import com.twitter.finagle.{CancelledRequestException, Failure, Filter, Service}
import com.twitter.finagle.http.{MediaType, Method, Request, Response, Status}
import com.twitter.util.{Await, Future}
import org.specs2.mock.Mockito
import org.specs2.mutable.Specification
import org.specs2.specification.Scope
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

import scala.collection.mutable.ListBuffer

class ServerSpec extends Specification with Mockito {
trait Context extends Scope {
val rpc = mock[TestMessage => Future[TestMessage]]
val protoService = ProtoService(
Seq(
ProtoRpc(EndpointMetadata("svc", "rpc"), rpc)
ProtoRpcBuilder(EndpointMetadata("svc", "rpc"), rpc)
)
)
val server = ServerBuilder()
Expand Down Expand Up @@ -71,6 +74,33 @@ class ServerSpec extends Specification with Mockito {

}

"generated message filter" in new Context {
val recorderRequests = ListBuffer[GeneratedMessage]()
val filter = new MessageFilter {
override def toFilter[
Req <: GeneratedMessage: GeneratedMessageCompanion,
Resp <: GeneratedMessage: GeneratedMessageCompanion
]: Filter[Req, Resp, Req, Resp] =
(request: Req, next: Service[Req, Resp]) => {
recorderRequests += request
next(request)
}
}
override val server: Service[Request, Response] = ServerBuilder()
.withPrefix("/foo")
.withMessageFilter(filter)
.register(protoService)
.build
val request = httpRequest(path = "/foo/svc/rpc")
val message = TestMessage()
rpc.apply(any) returns Future.value(message)

val response = Await.result(server(request))
response.status ==== Status.Ok

recorderRequests.toList ==== List(message)
}

"exceptions" >> {
"TwinagleException" in new Context {
val request = httpRequest()
Expand Down

0 comments on commit 3719984

Please sign in to comment.