No dinâmico mundo do desenvolvimento web, a capacidade de lidar com eventos de forma eficiente e confiável é crucial. Muitas vezes, nos deparamos com cenários complexos onde o processamento imediato de eventos não é viável ou seguro. É aí que o Inbox Pattern entra em cena, oferecendo uma abordagem robusta para o processamento assíncrono de eventos, garantindo que nenhuma informação seja perdida e que as operações sejam executadas de forma controlada e escalável.
Este artigo explora em profundidade o Inbox Pattern, seus benefícios, diferentes tipos de processamento e uma implementação prática para um cenário comum: detecção de fraudes em um sistema de pedidos. Vamos mergulhar nos detalhes de como essa arquitetura pode transformar a maneira como você lida com eventos em suas aplicações web.
Por que utilizar o Inbox Pattern?
Nem todos os eventos podem ser processados imediatamente após a sua chegada. Existem situações onde o processamento imediato pode levar a problemas sérios, comprometendo a integridade e a estabilidade do sistema. O Inbox Pattern surge como uma solução para estes desafios, permitindo um processamento mais cuidadoso e controlado. Considere os seguintes cenários:
- Cálculos Complexos: Operações demoradas que podem exceder o limite de tempo de resposta do broker de mensagens, resultando em falhas e reenvios da mensagem, potencialmente causando processamento duplicado.
- Conversões de Documentos: Processamento de arquivos grandes, como conversão de HTML para PDF, que são inerentemente imprevisíveis em termos de tempo de execução.
- Chamadas a Serviços Externos: Envio de e-mails via provedores como SendGrid ou Mailgun, que podem ter latência variável e dependem da disponibilidade de serviços terceiros.
- Agregação de Múltiplos Eventos: Combinação de dados de vários eventos antes de iniciar o processamento, exigindo um mecanismo para armazenar e correlacionar os eventos.
- Operações com Uso Intensivo de Recursos: Tarefas que exigem um uso significativo de CPU, memória ou operações de I/O, que podem sobrecarregar o sistema se executadas de forma síncrona.
Os Problemas do Processamento Imediato
Processar eventos diretamente no consumidor pode levar a uma série de problemas que afetam a confiabilidade e a escalabilidade da sua aplicação:
- Problemas de Timeout: Operações complexas podem exceder o tempo limite (timeout) do broker de mensagens. Isso faz com que o broker considere a operação como falha e reenvie a mensagem, resultando em processamento duplicado.
- Perda de Controle sobre Retentativas: Quando a execução falha, você fica à mercê da política de retentativas do broker. A maioria dos brokers usa um backoff exponencial simples, sem controle refinado sobre as tentativas, o tempo entre elas ou o rastreamento de falhas.
- Esgotamento de Recursos: Um alto volume de eventos pode sobrecarregar o serviço se cada evento acionar operações dispendiosas de forma síncrona.
O Inbox Pattern resolve esses problemas ao desacoplar a recepção do evento do seu processamento. O evento é persistido primeiro e, em seguida, processado de forma assíncrona, dando controle total sobre a lógica e o tratamento de falhas.
Tipos de Processamento
Com base na experiência, é possível identificar três padrões distintos de processamento de eventos:
1. Processamento de Evento Único
Cada evento é processado de forma independente, sem considerar outros eventos. Isso é ideal para cenários onde cada evento contém toda a informação necessária para a execução da lógica de negócios.
Exemplo: A mudança de status de um usuário para "SUSPEITO". O sistema de detecção de fraudes precisa revisar todos os pedidos ativos desse usuário, verificar os métodos de pagamento e potencialmente sinalizar as transações, tudo isso independentemente de outras mudanças de status do usuário. Cada evento de atualização contém o estado anterior e o atual, facilitando a identificação da mudança.
2. Processamento Sequencial
Eventos para a mesma entidade devem ser processados em ordem para manter a consistência. A ordem de processamento é fundamental para garantir que o estado da entidade seja atualizado corretamente.
Exemplo: Cálculo de pontos de recompensa de um usuário com base no histórico de compras. Eventos como CompraConcluída
→ ReembolsoEmitido
→ BônusAplicado
devem ser processados em sequência por usuário. O saldo final de pontos depende do processamento desses eventos na ordem exata em que ocorreram.
3. Processamento em Lote (Batch)
Múltiplos eventos são processados juntos para aumentar a eficiência. Isso é útil para operações que podem ser otimizadas ao serem executadas em um conjunto de dados em vez de individualmente.
Exemplo: Geração de relatórios financeiros diários, coletando todos os eventos de transação ao longo do dia e processando-os juntos à meia-noite. Isso evita recalcular totais, médias e resumos após cada transação individual.
Este artigo se concentra no Processamento de Evento Único, demonstrando como o Inbox Pattern pode ser aplicado nesse cenário.
Arquitetura de Demonstração
Para demonstrar o Inbox Pattern, foi criada uma arquitetura modular com três serviços:
- Serviço de Usuário: Gerencia contas de usuário e mudanças de status.
- Serviço de Produto: Lida com o catálogo de produtos.
- Serviço de Pedido: Processa pedidos e se inscreve em eventos de usuário e produto.
Cenário: Quando o status de um usuário muda, o Serviço de Usuário emite um evento. O Serviço de Pedido se inscreve nesses eventos, armazena-os em uma "caixa de entrada" (inbox) e realiza verificações de fraude quando o status envolve "SUSPEITO".
Você pode explorar a implementação completa neste GitLab Repository.
Como Tudo Funciona em Conjunto
Vamos percorrer o fluxo completo antes de mergulhar no código:
- O Serviço de Usuário altera o status de um usuário para SUSPEITO.
- O evento é publicado via broker de mensagens (Kafka, RabbitMQ, etc.). Neste caso, é utilizada uma implementação interna do Spring.
- O consumidor do Serviço de Pedido recebe o evento e o armazena imediatamente na caixa de entrada (inbox).
- Um agendador (scheduler) é acionado a cada 10 segundos.
- O runner busca eventos pendentes no banco de dados da caixa de entrada.
- O handler processa cada evento, realizando verificações de fraude nos pedidos do usuário.
- Em caso de sucesso: O evento é marcado como ENVIADO, indicando que o processamento foi concluído.
- Em caso de falha: O evento é agendado para retentativa com backoff exponencial.
- A retentativa continua até o sucesso ou até atingir o limite máximo de tentativas.
O ponto chave é que a única tarefa do consumidor de eventos é armazenar o evento rapidamente e confirmar o recebimento ao broker. Todo o processamento complexo acontece de forma assíncrona, com controle total sobre as retentativas e o tratamento de falhas.
Implementação: Exemplo de Detecção de Fraude
Agora, vamos ver como isso é implementado no código.
Passo 1: Armazenando Eventos
Quando um evento chega, ele é armazenado imediatamente com o contexto que define como deve ser processado:
@EventConsumer
class UserInboxEventsConsumerV1(
private val storage: OrderInboxEventStorageAdapter,
) {
fun onStatusUpdated(event: EventV1<UserStatusUpdatedV1>) {
storage.create(
event,
InboxEventContext.single(InboxTopic.FRAUD)
)
}
}
Pontos importantes:
InboxEventContext.single()
marca o evento para processamento único.InboxTopic.FRAUD
agrupa eventos relacionados.- O evento é armazenado como está – nenhum processamento acontece aqui.
Passo 2: Definindo o Contexto do Evento
A estrutura de contexto informa ao sistema como processar os eventos:
data class InboxEventContext(
val topic: String,
val processingType: ProcessingType
) {
companion object {
fun single(topic: String) = InboxEventContext(
topic = topic,
processingType = ProcessingType.SINGLE,
)
}
}
enum class ProcessingType {
SINGLE,
SEQUENTIAL,
BATCH
}
Cada evento armazenado contém:
data class InboxEventData(
val id: UUID,
val context: InboxEventContext,
val event: EventV1<out EventDtoBody>,
val notification: Notification = Notification(),
)
Passo 3: Rastreando o Estado do Processamento
O objeto Notification
rastreia as tentativas de processamento e o status:
data class Notification(
val status: Status = Status.PENDING,
val attempts: Int = 0,
val executeAt: Instant? = Instant.now(),
val failedReasons: List<FailedReason> = listOf(),
) {
enum class Status {
PENDING, SENT, FAILED
}
}
Lógica de retentativa com backoff exponencial:
fun Notification.toFailure(
exception: Exception,
maxRetries: Int,
baseRetryIn: Int,
): Notification {
val limitExceeded = attempts >= maxRetries
val newAttempts = attempts + 1
val nextExecuteAt =
if (limitExceeded) null
else executeAt?.plusSeconds(baseRetryIn * newAttempts.toLong())
return copy(
status = if (limitExceeded) Status.FAILED else Status.PENDING,
attempts = newAttempts,
failedReasons = failedReasons + FailedReason(
occurredAt = Instant.now(),
message = exception.message ?: "Unknown error"
),
executeAt = nextExecuteAt,
)
}
Passo 4: Configurando o Agendador
Defina quando e com que frequência processar os eventos:
@ConfigurationProperties(prefix = "order.inbox.single")
data class OrderSingleInboxEventProps(
override val pageSize: Int = 20,
override val maxRetries: Int = 15,
override val baseRetryIn: Int = 2,
override val executeEvery: Duration = Duration.ofSeconds(10),
override val topic: String? = null,
) : VTInboxEventProps
Detalhes da configuração:
pageSize: 20
– Processa 20 eventos por lote.maxRetries: 15
– Retenta eventos com falha até 15 vezes.baseRetryIn: 2
– Começa com atrasos de 2 segundos, aumentando exponencialmente.executeEvery: 10s
– Verifica novos eventos a cada 10 segundos.topic: null
– Processa todos os eventos do tipo SINGLE (ou especifica um tópico para controle granular).
Passo 5: Configurando a Infraestrutura de Processamento
Conecte tudo com a configuração do Spring:
@Configuration
@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
@EnableConfigurationProperties(OrderSingleInboxEventProps::class)
class OrderInboxEventConfig {
@Bean
fun orderSingleEventRunner(
orderInboxEventStorageAdapter: OrderInboxEventStorageAdapter,
orderInboxEventHandlers: List<VTInboxEventHandler<*>>,
props: OrderSingleInboxEventProps,
) = VTSingleEventRunner(
repository = orderInboxEventStorageAdapter,
handlers = orderInboxEventHandlers,
props = props,
)
@Bean
fun orderSingleEventScheduler(
orderMongoTemplate: MongoTemplate,
orderSingleEventRunner: VTSingleEventRunner,
taskScheduler: TaskScheduler,
props: OrderSingleInboxEventProps,
) = VTEventScheduler(
lockProvider = MongoLockProvider(orderMongoTemplate.db),
eventRunner = orderSingleEventRunner,
taskScheduler = taskScheduler,
props = props
)
}
Importante: @EnableSchedulerLock
evita o processamento duplicado em ambientes distribuídos usando ShedLock.
Passo 6: O Event Runner
O runner busca eventos pendentes e os processa em lotes:
class VTSingleEventRunner(
private val repository: VTInboxEventStoragePort,
private val handlers: List<VTInboxEventHandler<*>>,
private val props: VTInboxEventProps,
) : VTEventRunner {
private val log = KotlinLogging.logger {}
override fun run() {
var events: List<InboxEventData>
do {
events = props.topic?.let { repository.findPendingByTopic(it, props.pageSize) }
?: repository.findPendingByType(ProcessingType.SINGLE, props.pageSize)
events.forEach { processOne(it) }
} while (events.isNotEmpty())
}
private fun processOne(event: InboxEventData) {
var notification: Notification = event.notification
try {
log.info { "Processing inbox event: ${event.id}, topic: ${event.context.topic}" }
handlers
.find { it.topic == event.context.topic }
?.let {
@Suppress("UNCHECKED_CAST")
(it as VTInboxEventHandler<EventDtoBody>).handle(event.event as EventV1<EventDtoBody>)
}
notification = event.notification.toSuccess()
} catch (e: Exception) {
log.error(e) { "Failed to process event: ${event.id}" }
notification = event.notification.toFailure(e, props.maxRetries, props.baseRetryIn)
} finally {
updateEvent(event, notification)
}
}
private fun updateEvent(event: InboxEventData, notification: Notification) {
try {
repository.update(event.copy(notification = notification))
} catch (e: Exception) {
log.error(e) { "Failed to update event: ${event.id}" }
}
}
}
Fluxo de processamento:
- Busca eventos pendentes (por tópico ou tipo de processamento).
- Encontra o handler apropriado para o tópico de cada evento.
- Executa o handler.
- Marca como sucesso ou agenda retentativa em caso de falha.
- Continua até não haver mais eventos pendentes.
Passo 7: O Agendador
O agendador garante o processamento regular com bloqueio distribuído:
class VTEventScheduler(
private val eventRunner: VTEventRunner,
private val taskScheduler: TaskScheduler,
private val props: VTInboxEventProps,
lockProvider: ExtensibleLockProvider
) : ApplicationListener<ContextRefreshedEvent> {
private val schedulingLockProvider = SchedulingLockProvider(lockProvider)
private val lockName = "inbox-${props.topic ?: eventRunner::class.simpleName}"
override fun onApplicationEvent(event: ContextRefreshedEvent) {
taskScheduler.scheduleWithFixedDelay(
{ processEvents() },
props.executeEvery
)
}
private fun processEvents() {
DefaultLockingTaskExecutor(schedulingLockProvider)
.executeWithLock(
Runnable { eventRunner.run() },
LockConfiguration(
Instant.now