你可能用错了 kafka 的重试机制( 四 )


重试主题什么时候可行?需要明确的是 , 重试主题并非一直都是错误的模式 。 当然 , 它也存在一些合适的用例 。 具体来说 , 当消费者的工作是收集不可修改的记录时 , 这种模式就很不错 。 这样的例子可能包括:
处理网站活动流以生成报告的消费者
将交易添加到分类账的消费者(只要这些交易用不着按特定顺序跟踪)
正在从另一个数据源ETL数据的消费者
这类消费者可能会从重试主题模式中受益 , 同时没有数据损坏的风险 。
不过 , 请注意即使存在这种用例 , 我们仍应谨慎行事 。 构建这样的解决方案既复杂又耗时 。 因此 , 作为一个组织 , 我们不想为每个新的消费者编写一个新的解决方案 。 相反 , 我们要创建一个统一的解决方案 , 比如一个库或一个容器等 , 可以在各种服务之间重复使用 。
还存在另一个问题 。 我们可能会为相关消费者构建一个重试主题的解决方案 。 不幸的是 , 不久之后 , 这个解决方案就会进入跨边界事件发布消费者的领域了 。 拥有这些消费者的团队可能没有意识到风险的存在 。 正如我们前面所讨论的那样 , 在发生重大数据损坏之前 , 他们可能不会意识到任何问题 。
因此 , 在实现重试主题解决方案之前 , 我们应100%确定:
我们的业务中永远不会有消费者来更新现有数据 , 或者
我们拥有严格的控制措施 , 以确保我们的重试主题解决方案不会在此类消费者中实现
我们如何改善这种模式?鉴于重试主题模式可能不是跨边界事件发布消费者的可接受解决方案 , 我们是否可以对其做一些调整来改善它呢?
一开始 , 本文想要提供一种完整的解决方案 。 但之后我意识到 , 并不存在什么万能的路径 。 因此 , 我们将只讨论一些在制定合适解决方案时需要考虑的事项 。
消除错误类型如果我们能够在可恢复错误和不可恢复错误之间消除歧义 , 生活就会变得轻松许多 。 例如 , 如果我们的消费者开始遇到可恢复错误 , 那么重试主题就变得多余了 。
因此 , 我们可以尝试确定所遇到的错误类型:
voidprocessMessage(KafkaMessagekm){try{Messagem=km.getMessage;transformAndSave(m);}catch(Throwablet){if(isRecoverable(t)){//...}else{//...}}}在上面的Java伪代码示例中 , isRecoverable将采用一种白名单方法来确定t是否表示可恢复错误 。 换句话说 , 它检查t以确定它是否与任何已知的可恢复错误(例如SQL连接错误或ReST客户端超时)相匹配 , 如果匹配则返回true , 否则返回false 。 这样就能防止我们的消费者被不可恢复错误一直阻塞下去 。
诚然 , 要在可恢复错误和不可恢复错误之间消除歧义可能很困难 。 例如 , 一个SQLException可能指的是一次数据库故障(可恢复)或一次约束违反状况(不可恢复) 。 如有疑问 , 我们可能应该假设错误是不可恢复的——为此要冒的风险是将其他好的消息发送给隐藏主题 , 从而延迟它们的处理……但这也能避免我们无意间陷入泥潭 , 无休止地尝试处理不可恢复错误 。
在消费者内重试可恢复错误正如我们所讨论的那样 , 存在可恢复错误时 , 将消息发布到重试主题毫无意义 。 我们只会为下一条消息的失败扫清道路 。 相反 , 消费者可以简单地重试 , 直到条件恢复 。
当然 , 出现可恢复错误意味着外部资源存在问题 。 我们不断对这块资源发送请求是无济于事的 。 因此 , 我们希望对重试应用一个退避策略 。 我们的伪Java代码现在可能看起来像这样:
voidprocessMessage(KafkaMessagekm){try{Messagem=km.getMessage;transformAndSave(m);}catch(Throwablet){if(isRecoverable(t)){doWithRetry(m,Backoff.EXPONENTIAL,this::transformAndSave);}else{//...}}}(注意:我们使用的任何退避机制都应配置为在达到某个阈值时向我们发出警报 , 并通知我们潜在的严重错误)
遇到不可恢复错误时 , 将消息直接发送到最后一个主题另一方面 , 当我们的消费者遇到不可恢复错误时 , 我们可能希望立即隐藏(stash)该消息 , 以释放后续消息 。 但在这里使用多个重试主题会有用吗?答案是否定的 。 在转到DLQ之前 , 我们的消息只会经历n次消费失败而已 。 那么 , 为什么不从一开始就将消息粘贴在那里呢?
与重试主题一样 , 这个主题(在这里 , 我们将其称为隐藏主题)将拥有自己的消费者 , 其与主消费者保持一致 。 但就像DLQ一样 , 这个消费者并不总是在消费消息;它只有在我们明确需要时才会这么做 。
考虑排序来看看排序的情况 。 我们在这里重用之前的“用户/登录”示例 。 尝试处理Zo?名称中的?字符时 , Login消费者可能会遇到错误 。 消费者将其识别为一个不可恢复错误 , 将消息放在一边 , 然后继续处理后续消息 。 不久之后 , 消费者将获得Zoiee消息并成功处理它 。