1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| object UserEntity { final case class State( tokens: Map[String, DueEpochMillis] = Map(), refreshTokens: Map[String, DueEpochMillis] = Map()) extends CborSerializable { def clear(clearTokens: IterableOnce[String], clearRefreshTokens: IterableOnce[String]): State = copy(tokens = tokens -- clearTokens, refreshTokens = refreshTokens -- clearRefreshTokens)
def addToken(created: TokenCreated): State = { val tokenDue = OAuthUtils.expiresInToEpochMillis(created.accessTokenExpiresIn) val refreshTokenDue = OAuthUtils.expiresInToEpochMillis(created.refreshTokenExpiresIn) State(tokens + (created.accessToken -> tokenDue), refreshTokens + (created.refreshToken -> refreshTokenDue)) }
def addToken(accessToken: String, expiresIn: FiniteDuration): State = copy(tokens = tokens + (accessToken -> OAuthUtils.expiresInToEpochMillis(expiresIn))) }
sealed trait Command extends CborSerializable
final case class CreateToken(replyTo: ActorRef[AccessToken]) extends Command final case class CheckToken(accessToken: String, replyTo: ActorRef[Int]) extends Command final case class RefreshToken(refreshToken: String, replyTo: ActorRef[Option[AccessToken]]) extends Command final case object ClearTick extends Command
sealed trait Event extends CborSerializable final case class TokenCreated( accessToken: String, accessTokenExpiresIn: FiniteDuration, refreshToken: String, refreshTokenExpiresIn: FiniteDuration) extends Event final case class TokenRefreshed(accessToken: String, expiresIn: FiniteDuration) extends Event final case class ClearEvent(clearTokens: Set[String], clearRefreshTokens: Set[String]) extends Event
val TypeKey: EntityTypeKey[Command] = EntityTypeKey("UserEntity")
def init(system: ActorSystem[_]): ActorRef[ShardingEnvelope[Command]] = ClusterSharding(system).init( Entity(TypeKey)(ec => apply(ec)) .withSettings(ClusterShardingSettings(system).withPassivateIdleEntityAfter(Duration.Zero)))
private def apply(ec: EntityContext[Command]): Behavior[Command] = { val userId = ec.entityId Behaviors.setup( context => Behaviors.withTimers(timers => new UserEntity(PersistenceId.of(ec.entityTypeKey.name, ec.entityId), userId, timers, context) .eventSourcedBehavior())) } }
import blog.oauth2.peruser.UserEntity._ class UserEntity private ( persistenceId: PersistenceId, userId: String, timers: TimerScheduler[Command], context: ActorContext[Command]) { timers.startTimerWithFixedDelay(ClearTick, 2.hours)
def eventSourcedBehavior(): EventSourcedBehavior[Command, Event, State] = EventSourcedBehavior( persistenceId, State(), (state, command) => command match { case CheckToken(accessToken, replyTo) => processCheckToken(state, accessToken, replyTo) case RefreshToken(refreshToken, replyTo) => processRefreshToken(state, refreshToken, replyTo) case CreateToken(replyTo) => processCreateToken(replyTo) case ClearTick => processClear(state) }, (state, event) => event match { case TokenRefreshed(accessToken, expiresIn) => state.addToken(accessToken, expiresIn) case created: TokenCreated => state.addToken(created) case ClearEvent(clearTokens, clearRefreshTokens) => state.clear(clearTokens, clearRefreshTokens) })
private def processRefreshToken( state: State, refreshToken: String, replyTo: ActorRef[Option[AccessToken]]): Effect[Event, State] = { if (state.refreshTokens.get(refreshToken).exists(due => System.currentTimeMillis() < due)) { val refreshed = TokenRefreshed(OAuthUtils.generateToken(userId), 2.hours) Effect .persist(refreshed) .thenReply(replyTo)(_ => Some(AccessToken(refreshed.accessToken, refreshed.expiresIn.toSeconds, refreshToken))) } else { Effect.reply(replyTo)(None) } }
private def processCheckToken(state: State, accessToken: String, replyTo: ActorRef[Int]): Effect[Event, State] = { val status = state.tokens.get(accessToken) match { case Some(dueTimestamp) => if (System.currentTimeMillis() < dueTimestamp) 200 else 401 case None => 401 } Effect.reply(replyTo)(status) }
private def processCreateToken(replyTo: ActorRef[AccessToken]): Effect[Event, State] = { val createdEvent = TokenCreated(OAuthUtils.generateToken(userId), 2.hours, OAuthUtils.generateToken(userId), 30.days) Effect.persist(createdEvent).thenReply(replyTo) { _ => AccessToken(createdEvent.accessToken, createdEvent.accessTokenExpiresIn.toSeconds, createdEvent.refreshToken) } }
private def processClear(state: State): Effect[Event, State] = { if (state.tokens.isEmpty && state.refreshTokens.isEmpty) { Effect.stop() } else { val now = System.currentTimeMillis() val clearTokens = state.tokens.view.filterNot { case (_, due) => now < due }.keys.toSet val clearRefreshTokens = state.refreshTokens.view.filterNot { case (_, due) => now < due }.keys.toSet Effect.persist(ClearEvent(clearTokens, clearRefreshTokens)) } } }
|