Sample Post

package http
import java.time.LocalDateTime
import java.util.UUID
import akka.actor.ActorSystem
import akka.event.slf4j.Logger
import akka.http.javadsl.model.headers.Authorization
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import helpers.{ PlayJsonSupport, Tokens }
import main.Bergbuch
import models.{ Subscription, SubscriptionCreationRequest, SubscriptionCreationResponse }
import play.api.libs.json.{ JsObject, JsValue, Json }
import services.AuthService
import services.db.SubscriptionDbService
import scala.concurrent.{ ExecutionContextExecutor, Future }
class SubscriptionRoute(
val authService: AuthService,
val subscriptionDbService: SubscriptionDbService,
config: Config
)(
implicit
val system: ActorSystem,
implicit val materializer: ActorMaterializer,
implicit val executionContext: ExecutionContextExecutor
) extends PlayJsonSupport with SecurityDirectives {
val log = Logger("SubscriptionRoute")
val nakadiHost = config.getString("nakadi_host")
import StatusCodes._
import models.SubscriptionEvent._
val route: Route = authenticate {
case true =>
pathPrefix("subscriptions") {
pathEndOrSingleSlash {
get {
onSuccess(subscriptionDbService.getSubscriptions) { subs =>
complete(subs.map(Json.toJson(_)(Subscription.formatSubscription)))
}
} ~ post {
entity(as[SubscriptionCreationRequest]) { creationRequest =>
onSuccess(createSubscriptionOnNakadi(creationRequest)) {
case Some(creationResponse) =>
val uuid: UUID = UUID.fromString(creationResponse.id)
val newSubscription: Subscription = Subscription(uuid, LocalDateTime.now(), active = true)
onSuccess(subscriptionDbService.createSubscription(newSubscription)) { _ =>
log.info(s"created a new subscription for id : ${creationResponse.id}")
Bergbuch.startEventConsumption(newSubscription)
complete(HttpResponse(Created))
}
case None => complete(HttpResponse(BadRequest))
}
}
}
} ~
pathPrefix(JavaUUID) { id =>
pathEndOrSingleSlash {
delete {
deleteSubscriptionOnNakadi(id.toString) // tell nakadi to delete the subscription
onSuccess(subscriptionDbService.deleteSubscriptions(id)) { changedRows =>
if (changedRows == 1) {
Bergbuch.stopEventConsumption(id)
} else {
log.warn(s"failed to remove subscription with id: $id")
}
complete(HttpResponse(NoContent))
}
}
}
}
}
case false => complete(HttpResponse(Unauthorized))
}
def token: String = Tokens.tokens.getAccessToken("nakadi-read").getToken
def createSubscriptionOnNakadi(subCreationReq: SubscriptionCreationRequest): Future[Option[SubscriptionCreationResponse]] = {
// prepare post object - `consumer group` is added to avoid collision with other subscriptions
// this would need to be improved for 'multi-team-bergbuch'-usage
val inputJson: JsValue = Json.toJson(subCreationReq)
val consumerGroup: JsObject = Json.obj("consumer_group" -> "bergbuch")
val mergedJson = inputJson.as[JsObject] ++ consumerGroup
val postObject = HttpEntity(ContentType(MediaTypes.`application/json`), Json.stringify(mergedJson))
val subCreationRequest = HttpRequest(
uri = s"https://$nakadiHost/subscriptions",
method = HttpMethods.POST,
entity = postObject,
headers = scala.collection.immutable.Seq(Authorization.oauth2(token))
)
val subCreationResponseFuture = Http().singleRequest(subCreationRequest)
subCreationResponseFuture.flatMap { response =>
response.status match {
case Created => Unmarshal(response.entity).to[SubscriptionCreationResponse].map(Some(_))
case OK =>
val optSubResource = response.headers.find(_.lowercaseName == "location").map(_.value)
log.info(s"subscription already exists at: $optSubResource")
Future.successful(None)
case Unauthorized =>
log.error(s"Nakadi declined my request with 401. Were my tokens refreshed?")
Future.successful(None)
case _ =>
log.error(s"failed to create subscription on nakadi with response code: ${response.status}")
Future.successful(None)
}
}
}
def deleteSubscriptionOnNakadi(id: String): Future[Unit] = {
log.debug(s"requesting sub deletion at nakadi for sub id: $id")
val subDeletionRequest = HttpRequest(uri = s"https://$nakadiHost/subscriptions/$id", method = HttpMethods.DELETE)
val subDeletionResponseFuture = Http().singleRequest(subDeletionRequest.addHeader(Authorization.oauth2(token)))
subDeletionResponseFuture.map { response =>
response.status match {
case NoContent => log.info(s"deleted subscription with id: $id")
case _ => log.error(s"failed to delete subscription on nakadi with response code: ${response.status}")
}
}
}
}
Show your support

Clapping shows how much you appreciated aravindh’s story.