alpakka-kafka(10)-用kafka实现分布式近实时交易


  随着网上购物消费模式热度的不断提高,网上销售平台上各种促销手段也层出不穷,其中“秒购”已经是各种网站普遍流行的促销方式了。“秒购”对数据的实效性和精确性要求非常高,所以通过分布式运算实现高并发数据处理应该是正确的选择。不过,高并发也意味着高频率的数据操作冲突,而高频使用“锁”又会严重影响效率及容易造成不可控异常,所以又被迫选择单线程运行模式。单线程、分布式虽然表面相悖,不过如上篇博文所述:可以利用akka-cluster-sharding分片可指定调用的特性将一种商品的所有操作放到同一个shard上运算(因为shard即是actor,mailbox里的运算指令是按序执行的)可容许在一个分布式环境下有多个分片来同时操作。如此可在获取分布式运算高效率的同时又保证了数据的安全性和完整性。

虽然通过分布式运算可以实现近实时的“秒购”交易,但每个“秒购”请求都直接被发往一个actor信箱里等待执行,如果在一个短时间内出现超大量请求的话就很可能使shard actor mailbox超载,造成系统崩溃,这时需要一种缓冲机制根据具体负载情况来推送任务。当然,这种机制必须具备数据持久化能力,所以kafka是这个缓冲机制的一个最佳选择。

在这篇讨论里我想通过一个“近实时交易平台nrtxn(near realtime transaction)”项目来示范“用kafka实现分布式近实时交易”具体的设计和实现。

nrtxn的应用方案是这样的:提供一个平台及相关api给平台用户。各用户分别将自己的产品推送到平台数据库由平台托管。用户通过平台提供的http api向nrtxn平台提交交易请求(如库存扣减请求),等待或查询平台返回操作状态回应。

nrtxn的系统流程如下:

用户调用http api提交请求 ->
http-server将请求派送给各用户所属的分片workManager ->
workManager将请求写入kafka ->
kafka reader读出请求并按请求中交易项目将请求发送给项目所属的分片txnProcessor->
txnProcessor完成操作后发送回应至workManager ->
workManager在按请求所属的回应地址将最终回应返回给http server -> 用户获取请求回应
值得注意的是交易请求在到达终点actor txnProcessor传递中途经过了kafka,所以在txnProcessor完成数据操作后需要通过一些actor地址管理才能正确地回应到http server上的请求线程。
我们大致可以从请求内容了解平台提供的功能:

class TxnRequest( //唯一键:shopId+seller+itemCode+reqTime
                     shopId: String = "",        //托售单位(平台用户)
                     seller: String = "",          //售货组别
                     buyer: String = "",         //购货单位
                     respond: Int = 1,           //1=需要返回操作结果,0=不返回response (fire-and-go)
                     reqTime: String = "",       //请求时间(yyyyMMddHHmmssSSS)
                     itemCode: String = "",      //交易项目
                     reqValue: Double = 0.00,    //操作价值
                     remarks: String = "",       //操作备注
                     submitTm: String = "",      //提交时间 由系统填写(yyyyMMddHHmmssSSS)
                                                            //写入kafka之前填写,读出时超出指定时段视为无效请求
)

操作请求支持两种模式:request-response, fire-and-go, 由respond值表示。无论如何,平台都会产生相应的response并记录在平台数据库里。用户可在使用fire-and-go模式或者系统出现异常情况时通过原request查询对应的response。

回应response内容如下:

case class TxnResponse(
                          shopId: String = "",        //托售单位(平台用户)
                          seller: String = "",        //售货单位
                          buyer: String = "",         //购货单位
                          reqTime: String = "",       //请求时间(yyyyMMddHHmmssSSS
                          failed: Int = 0,            //操作失败 <> 0
                          reason: String = "",        //失败原因说明
                          rspTime: String = "",       //完成时间
                          itemCode: String = "",      //操作目标
                          rsrvdValue: Double = 0.00,  //系统预留价值                           
                          reqValue: Double = 0,       //操作价值
                          preValue: Double = 0.00,    //操作前价值
                          reqValue: Double = 0.00,    //操作价值
                        )

1、http api 发送request到workManager分片;

pathPrefix("submit") {
                      val entityRef = sharding.entityRefFor(WorkManager.EntityKey, s"$shopId:$posId")
                      entity(as[String]) { json =>
                        log.step(s"received submit($shopId,$json)")
                        //json validation
                        val reqInfo = TxnServices.parseRequest(shopId,json)
                        if (reqInfo._1) {
                          val req = reqInfo._3 
                          //mark submit time
                          val jsonReq = toJson(req.copy(submitTm = mgoDateToString(mgoDateTimeNow,"yyyyMMddHHmmssSSS")))
                          if (req.respond == 1) {
                            val futResp = entityRef.ask[WorkManager.Response](WorkManager.TakeWork(jsonReq, _))
                              .map {
                                case WorkManager.ResultMsg(json) => HttpEntity(MediaTypes.`application/json`, json)
                                case WorkManager.ErrorMsg(msg) => HttpEntity(MediaTypes.`application/json`, msg)
                              }
                            onSuccess(futResp)(complete(_))
                          } else {
                            entityRef ! WorkManager.FireupWork(jsonReq)
                            complete("OK")
                          }
                        } else {
                          log.step(s"submit($shopId,$json): ${reqInfo._2}")
                          complete(toJson(Map[String,Any]("sts" -> -1, "msg" -> reqInfo._2)))
                        }
                      }
                    } ~

如果respond=1,entityRef.ask[WorkManager.Response](WorkManager.TakeWork(jsonReq, _)) 构建了一个future session 直至收到回复或超时。

2、workManager是一种actor,负责管理请求回应地址及写入Kafka:

             case TakeWork(jsonReq,replyTo) =>
              log.step(s"WorkManager: TakeWork($jsonReq)[${entityId}]")
              val req = fromJson[TxnRequest](jsonReq)
                val work = Work(
                  req.shopId + req.seller + req.itemCode + req.reqTime,
                  Instant.now(),
                  replyTo,
                )
                workStates = workStates.addWork(work)
                log.step(s"WorkManager: adding work: $work, new workStates: ${workStates.toString()}")
                for {
                  _ <- TxnServices.logRequest(jsonReq, trace)
                  _ <- SendProducer(ps).send(new ProducerRecord[String, String](kafkaTopic, jsonReq))
                } yield "Done"
                log.step(s"WorkManager: current workStates: ${workStates.toString()}")
              Behaviors.same

 workStates是一个请求管理类,如下:

   case class Work(jobId: String, startTime: Instant, replyTo: ActorRef[Response]) {
    def timeElapsed(): FiniteDuration = {
      Duration.between(startTime, Instant.now()).toScala
    }
  }
  case class WorkStates(pendingWorks: List[Work] = Nil) { self =>
    def cleanWorkList(timeLap: FiniteDuration) = {
      val pw = pendingWorks.foldRight(List[Work]()) { (w, accu) =>
        if (w.timeElapsed() > timeLap) {
          accu
        } else w :: accu
      }
      copy(pendingWorks = pw)
    }
    def addWork(w: Work) = copy(pendingWorks = w :: pendingWorks)
    def getWork(jobId: String) = {
      pendingWorks.find { w =>
        w.jobId == jobId
      }
    }
    override def toString(): String = {
      pendingWorks.foldRight("") {(w,accu) =>
        accu + w.jobId + " " + w.startTime + "\n"
      }
    }
  }

workManager在收到txnProcessor完成数据操作后的状态回应后从workStates中找出对应的请求地址进行回应:

            case WorkResponse(rsp) =>
              log.step(s"WorkManager: WorkResponse($rsp)[${entityId}]")
              log.step(s"WorkManager: current workStates: ${workStates.toString()}")
              val jobid = rsp.shopId+rsp.seller+rsp.itemCode+rsp.reqTime
              log.step(s"WorkManager: finding work with jobId = $jobid")
              val someWork = workStates.getWork(jobid)
              log.step(s"WorkManager: WorkResponse someWork = $someWork")
              if(someWork.isDefined) {
                  someWork.get.replyTo ! ResultMsg(toJson(Map[String, Any] ("sts" -> 0, "msg" -> "", "data" -> rsp)))
                  Done(loc.shopid, loc.posid, s"got WorkResponse($rsp).")
              }
              Behaviors.same

3、Kafka reader 读出请求后按请求交易项目编号指定txnProcessor分片派送请求:

def start =
    (1 to numReaders).toList.map { _ =>
      RestartSource
        .onFailuresWithBackoff(restartSource) { () => commitableSource }
        //      .viaMat(KillSwitches.single)(Keep.right)
        .mapAsyncUnordered(32) { msg =>
          TxnServices.toTxnProcessor(msg.record.value(), responseTimeout, trace)
            .onComplete {
              case Success(res) =>
                log.step(s"AtLeastOnceReaderGroup-toTxnProcessor returns ${res}: ${msg.record.value()}")
              case Failure(err) =>
                log.step(s"AtLeastOnceReaderGroup-toTxnProcessor Error ${err.getMessage}: ${msg.record.value()}")
            }
          FastFuture.successful(msg.committableOffset)
        }
        .toMat(Committer.sink(committerSettings))(Keep.left)
      .run()
    }

mapAsyncUnordered(32)可支持32个线程同时执行TxnServices.toTxnProcessor。通过back pressure,确保每个线程在完成后才进行下一个请求读取。
TxnServices.toTxnProcessor中通过请求产生分片的entityId,然后向对应的txnProcessor分片发送请求:

  def toTxnProcessor(jsonReq: String, askTimeout: FiniteDuration, trace: Boolean)(
     implicit sharding: ClusterSharding, streamFlowTimeout: Timeout, loc: MachineId) = {
    log.stepOn = trace
    log.step(s"TxnServices-toTxnProcessor($jsonReq)")
    val txnReq = fromJson[TxnRequest](jsonReq)
    val submt = mgoStringToDateTime("yyyyMMddHHmmssSSS",txnReq.submitTm)
    if (Duration.between(submt.toInstant,Instant.now()).toScala < askTimeout) {
      val entityId = hashItemCode(txnReq.itemCode)
      log.step(s"TxnServices-toTxnProcessor: entityId = $entityId")
      val workerEntityRef = sharding.entityRefFor(TxnProcessor.EntityKey, entityId)
      if (txnReq.respond == 0) {
        if (txnReq.reqValue > 0)
          workerEntityRef.ask[TxnProcessor.Response](TxnProcessor.RunDec(txnReq, _))
        else workerEntityRef.ask[TxnProcessor.Response](TxnProcessor.RunInc(txnReq, _))
      } else {
        if (txnReq.reqValue > 0)
          workerEntityRef.ask[TxnProcessor.Response](TxnProcessor.DecValue(txnReq, _))
        else workerEntityRef.ask[TxnProcessor.Response](TxnProcessor.IncValue(txnReq, _))
      }
    } else FastFuture.successful("Skip")
  }

在TxnServices.toTxnProcessor中对请求进行了超时验证。系统出现故障重启后留在kafka队列里的请求视为无效,因为http 端请求早已经过时了。

4、txnProcessor收到请求完成操作后产生response并发送给workManager:

           case DecValue(txnreq,replyTo) =>
            log.step(s"TxnProcessor: DecValue($txnreq)[${entityId}]")
            ctx.pipeToSelf(
              for {
                decRes <- decValue(txnreq.shopId,txnreq.itemCode,txnreq.reqValue,trace)
                _ <- FastFuture.successful {
                  log.step(s"TxnProcessor: DecValue($txnreq)[${entityId}] DecValue result = $decRes")
                }
                _ <- if (decRes._1._1 == 0) {
                  logResponse(txnreq, decRes._2._2.asInstanceOf[Double], decRes._2._1.asInstanceOf[Double],trace)
                } else FastFuture.successful {
                  log.step(s"TxnProcessor: DecValue($txnreq)[${entityId}] skip logging response")
                }
              } yield decRes
           ) {
              case Success(res) => {
                val txnrsp = TxnResponse(
                  txnreq.shopId,
                  txnreq.seller,
                  txnreq.buyer,
                  txnreq.reqTime,
                  res._1._1,
                  res._1._2,
                  mgoDateToString(mgoDateTimeNow,"yyyyMMddHHmmssSSS"),
                  txnreq.itemCode,
                  res._2._2.asInstanceOf[Double],
                  res._2._1.asInstanceOf[Double],
                  txnreq.reqValue,
                )
                entityRef ! WorkManager.WorkResponse(txnrsp)
                replyTo ! DoneOp
                Done(loc.shopid, loc.posid, s"DecValue response: $txnrsp")
              }
              case Failure(err) =>
                log.error(s"TxnProcessor: DecValue Error: ${err.getMessage}[${entityId}]")
                val txnrsp = TxnResponse(
                  txnreq.shopId,
                  txnreq.seller,
                  txnreq.buyer,
                  txnreq.reqTime,
                  1,
                  err.getMessage,
                  mgoDateToString(mgoDateTimeNow,"yyyyMMddHHmmssSSS"),
                  txnreq.itemCode,
                  0,
                  0,
                  txnreq.reqValue,
                )
                entityRef ! WorkManager.WorkFailure(txnrsp)
                replyTo ! DoneOp
                Done(loc.shopid, loc.posid, s"DecValue with error: ${err.getMessage}")
            }
            Behaviors.same

workManager收到WorkResponse后从workStates找到对应work的actorRef并将response发送至http route。
另外,nrtxn支持扣减DecValue和增加IncValue。在交易过程中,一般退还库存应该优先处理,可以用actor的priorityMailbox来实现:

 prio-dispatcher {
  type = Dispatcher
  mailbox-type = "com.datatech.nrtx.server.TxnProcessor$PriorityMailbox"
}

  class PriorityMailbox (settings: Settings, cfg: Config)
    extends UnboundedPriorityMailbox( PriorityGenerator {
      case x: IncValue => 0
      case x: RunInc => 1
      case _ => 2
    }
  )
...
      val priorityDispatcher = DispatcherSelector.fromConfig("prio-dispatcher")

      log.step(s"initializing sharding for ${TxnProcessor.EntityKey} ...")
      val txnProcessorType = Entity(TxnProcessor.EntityKey) { entityContext =>
        TxnProcessor(entityContext.shard,mgoClient,indexer,entityContext.entityId,keepAlive,trace)
      }.withStopMessage(TxnProcessor.StopWorker)
      sharding.init(txnProcessorType.withEntityProps(priorityDispatcher))

这样,txnProcessor会优先处理IncValue,RunInc消息。