Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle circuit breaker rejection #1246

Merged
merged 2 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package com.netflix.atlas.akka

import java.lang.reflect.Type
import java.util.zip.Deflater

import akka.http.scaladsl.coding.Coders
import akka.http.scaladsl.model.EntityStreamSizeException
import akka.http.scaladsl.model.HttpHeader
Expand All @@ -26,12 +25,14 @@ import akka.http.scaladsl.model.StatusCode
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.AuthenticationFailedRejection
import akka.http.scaladsl.server.CircuitBreakerOpenRejection
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.MalformedRequestContentRejection
import akka.http.scaladsl.server.MethodRejection
import akka.http.scaladsl.server.Rejection
import akka.http.scaladsl.server.RejectionHandler
import akka.http.scaladsl.server.Route
import akka.pattern.CircuitBreakerOpenException
import com.fasterxml.jackson.core.JsonProcessingException
import com.netflix.iep.service.ClassFactory
import com.typesafe.config.Config
Expand Down Expand Up @@ -202,6 +203,8 @@ object RequestHandler extends StrictLogging {
DiagnosticMessage.error(StatusCodes.NotFound, e)
case e: EntityStreamSizeException =>
DiagnosticMessage.error(StatusCodes.PayloadTooLarge, e)
case e: CircuitBreakerOpenException =>
DiagnosticMessage.error(StatusCodes.ServiceUnavailable, e)
case e: Throwable =>
DiagnosticMessage.error(StatusCodes.InternalServerError, e)
}
Expand All @@ -215,6 +218,8 @@ object RequestHandler extends StrictLogging {
val builder = RejectionHandler
.newBuilder()
.handle {
case CircuitBreakerOpenRejection(t) =>
complete(errorResponse(t))
case MalformedRequestContentRejection(_, t) =>
complete(errorResponse(t))
case MethodRejection(m) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
*/
package com.netflix.atlas.akka

import akka.actor.ActorSystem

import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream

import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers._
Expand All @@ -28,6 +29,8 @@ import com.netflix.iep.service.DefaultClassFactory
import com.typesafe.config.ConfigFactory
import org.scalatest.funsuite.AnyFunSuite

import java.lang.reflect.Type

class RequestHandlerNoCompressionsSuite extends AnyFunSuite with ScalatestRouteTest {

import scala.concurrent.duration._
Expand All @@ -47,7 +50,13 @@ class RequestHandlerNoCompressionsSuite extends AnyFunSuite with ScalatestRouteT
""".stripMargin
)

private val handler = new RequestHandler(config, new DefaultClassFactory())
private val bindings: java.util.function.Function[Type, AnyRef] = {
case c: Class[_] if c.isAssignableFrom(classOf[ActorSystem]) =>
system
case _ =>
null.asInstanceOf[AnyRef]
}
private val handler = new RequestHandler(config, new DefaultClassFactory(bindings))
private val routes = handler.routes

private def gzip(data: Array[Byte]): Array[Byte] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
*/
package com.netflix.atlas.akka

import akka.actor.ActorSystem

import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream

import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers._
Expand All @@ -28,6 +29,8 @@ import com.netflix.iep.service.DefaultClassFactory
import com.typesafe.config.ConfigFactory
import org.scalatest.funsuite.AnyFunSuite

import java.lang.reflect.Type

class RequestHandlerSuite extends AnyFunSuite with ScalatestRouteTest {

import scala.concurrent.duration._
Expand All @@ -52,7 +55,13 @@ class RequestHandlerSuite extends AnyFunSuite with ScalatestRouteTest {
""".stripMargin
)

private val handler = new RequestHandler(config, new DefaultClassFactory())
private val bindings: java.util.function.Function[Type, AnyRef] = {
case c: Class[_] if c.isAssignableFrom(classOf[ActorSystem]) =>
system
case _ =>
null.asInstanceOf[AnyRef]
}
private val handler = new RequestHandler(config, new DefaultClassFactory(bindings))
private val routes = handler.routes

test("/not-found") {
Expand Down Expand Up @@ -201,4 +210,16 @@ class RequestHandlerSuite extends AnyFunSuite with ScalatestRouteTest {
assert(response.status === StatusCodes.MethodNotAllowed)
}
}

test("/circuit-breaker") {
// Trigger failure to open the breaker
Get("/circuit-breaker") ~> routes ~> check {
assert(response.status === StatusCodes.InternalServerError)
}

// Ensure rejection handler returns 503
Get("/circuit-breaker") ~> routes ~> check {
assert(response.status === StatusCodes.ServiceUnavailable)
}
}
}
42 changes: 35 additions & 7 deletions atlas-akka/src/test/scala/com/netflix/atlas/akka/TestApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,53 @@
*/
package com.netflix.atlas.akka

import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes
import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.pattern.CircuitBreaker
import akka.stream.scaladsl.Source

class TestApi(val actorRefFactory: ActorRefFactory) extends WebApi {
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success

class TestApi(val system: ActorSystem) extends WebApi {

import CustomDirectives._
import scala.concurrent.duration._

private implicit val ec: ExecutionContext = OpportunisticEC.ec

private val breaker = new CircuitBreaker(
system.scheduler,
maxFailures = 1,
callTimeout = 5.seconds,
resetTimeout = 1.second
)

private def fail(): Future[Unit] = {
Future.failed(new RuntimeException("circuit breaker test"))
}

def routes: Route = {
path("jsonparse") {
post {
parseEntity(json[String]) { v =>
complete(HttpResponse(status = OK, entity = v))
complete(HttpResponse(status = StatusCodes.OK, entity = v))
}
}
} ~
path("query-parsing-directive") {
get {
parameter("regex") { v =>
val entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, v)
complete(HttpResponse(status = OK, entity = entity))
complete(HttpResponse(status = StatusCodes.OK, entity = entity))
}
}
} ~
Expand All @@ -50,7 +70,7 @@ class TestApi(val actorRefFactory: ActorRefFactory) extends WebApi {
extractRequest { req =>
val v = req.uri.query().get("regex").get
val entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, v)
complete(HttpResponse(status = OK, entity = entity))
complete(HttpResponse(status = StatusCodes.OK, entity = entity))
}
}
} ~
Expand All @@ -61,7 +81,15 @@ class TestApi(val actorRefFactory: ActorRefFactory) extends WebApi {
.single(ChunkStreamPart("start"))
.concat(Source((1 until 42).map(i => ChunkStreamPart(i.toString)).toList))
val entity = HttpEntity.Chunked(ContentTypes.`text/plain(UTF-8)`, source)
complete(HttpResponse(status = OK, entity = entity))
complete(HttpResponse(status = StatusCodes.OK, entity = entity))
}
}
} ~
path("circuit-breaker") {
get {
onCompleteWithBreaker(breaker)(fail()) {
case Success(_) => complete(StatusCodes.OK)
case Failure(e) => complete(StatusCodes.InternalServerError, e.getMessage)
}
}
}
Expand Down