FLINK-算子


 CONNECT 和 UNION 和 COMAP 和 COFLATMAP

union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  1. connect只能连接两个数据流,union可以连接多个数据流。
  2. connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
  3. 两个DataStream经过connect之后被转化为ConnectedStreamsConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上。控制流可以是阈值、规则、机器学习模型或其他参数。

用法:

val first = ...
val second = ...
val connected = first.connect(second)

ConnectedStreams提供了map()flatMap()方法,分别需要接收类型为CoMapFunctionCoFlatMapFunction的参数。

以上两个函数里面的泛型是第一条流的事件类型和第二条流的事件类型,以及输出流的事件类型。还定义了两个方法,每一个方法针对一条流来调用。map1()flatMap1()会调用在第一条流的元素上面,map2()flatMap2()会调用在第二条流的元素上面。

// IN1: 第一条流的事件类型
// IN2: 第二条流的事件类型
// OUT: 输出流的事件类型
CoMapFunction[IN1, IN2, OUT]
    > map1(IN1): OUT
    > map2(IN2): OUT

CoFlatMapFunction[IN1, IN2, OUT]
    > flatMap1(IN1, Collector[OUT]): Unit
    > flatMap2(IN2, Collector[OUT]): Unit

函数无法选择读某一条流。我们是无法控制函数中的两个方法的调用顺序的。当一条流中的元素到来时,将会调用相对应的方法。

对两条流做连接查询通常需要这两条流基于某些条件被确定性的路由到操作符中相同的并行实例里面去。在默认情况下,connect()操作将不会对两条流的事件建立任何关系,所以两条流的事件将会随机的被发送到下游的算子实例里面去。这样的行为会产生不确定性的计算结果,显然不是我们想要的。为了针对ConnectedStreams进行确定性的转换操作,connect()方法可以和keyBy()或者broadcast()组合起来使用。我们首先看一下keyBy()的示例。

val one = ...
val two = ...

val keyedConnect1 = one.connect(two).keyBy(0, 0)

val keyedConnect2 = one.keyBy(0).connect(two.keyBy(0))

无论使用keyBy()算子操作ConnectedStreams还是使用connect()算子连接两条KeyedStreams,connect()算子会将两条流的含有相同Key的所有事件都发送到相同的算子实例。两条流的key必须是一样的类型和值,就像SQL中的JOIN。在connected和keyed stream上面执行的算子有访问keyed state的权限。

连接一条DataStream和广播过的流:

val one = ...
val two = ...

val keyedConnect = first.connect(second.broadcast())

JOIN

Join操作DataStream时只能用在window中,和cogroup操作一样。

  1. Tumbling Window Join
  2. Sliding Window Join
  3. Session Window Join
  4. Interval Join
stream.join(otherStream)
    .where()
    .equalTo()
    .window()
    .apply()

Interval Join编程模型:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

DataStream orangeStream = ...
DataStream greenStream = ...

orangeStream
    .keyBy()
    .intervalJoin(greenStream.keyBy())
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector out) {
            out.collect(first + "," + second);
        }
    });

附:

Broadcast State使用场景

无论是分布式批处理还是流处理,将部分数据同步到所有实例上是一个十分常见的需求。例如,我们需要依赖一个不断变化的控制规则来处理主数据流的数据,主数据流数据量比较大,只能分散到多个算子实例上,控制规则数据相对比较小,可以分发到所有的算子实例上。Broadcast State与直接在时间窗口进行两个数据流的Join的不同点在于,控制规则数据量较小,可以直接放到每个算子实例里,这样可以大大提高主数据流的处理速度。

电商用户行为识别案例

下面开始具体构建一个实例程序。第一步,我们定义一些必要的数据结构来描述这个业务场景,包括用户行为和规则模式两个数据结构。

/**
    * 用户行为
    * categoryId为商品类目ID
    * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav)
    * */
case class UserBehavior(userId: Long,
                        itemId: Long,
                        categoryId: Int,
                        behavior: String,
                        timestamp: Long)

/**
    * 行为模式
    * 整个模式简化为两个行为
    * */
case class BehaviorPattern(firstBehavior: String, secondBehavior: String)

// 主数据流
val userBehaviorStream: DataStream[UserBehavior] = ...
// BehaviorPattern数据流
val patternStream: DataStream[BehaviorPattern] = ...

// Broadcast State只能使用 Key->Value 结构,基于MapStateDescriptor
val broadcastStateDescriptor =
new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])
val broadcastStream: BroadcastStream[BehaviorPattern] = patternStream
.broadcast(broadcastStateDescriptor)

// 生成一个KeyedStream
val keyedStream =  userBehaviorStream.keyBy(user => user.userId)
// 在KeyedStream上进行connect和process
val matchedStream = keyedStream
  .connect(broadcastStream)
  .process(new BroadcastPatternFunction)

/**
    * 四个泛型分别为:
    * 1. KeyedStream中Key的数据类型
    * 2. 主数据流的数据类型
    * 3. 广播流的数据类型
    * 4. 输出类型
    * */
class BroadcastPatternFunction
extends KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)] {

  // 用户上次性能状态句柄,每个用户存储一个状态
  private var lastBehaviorState: ValueState[String] = _
  // Broadcast State Descriptor
  private var bcPatternDesc: MapStateDescriptor[Void, BehaviorPattern] = _

  override def open(parameters: Configuration): Unit = {

    lastBehaviorState = getRuntimeContext.getState(
      new ValueStateDescriptor[String]("lastBehaviorState", classOf[String])
    )

    bcPatternDesc = new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])

  }

  // 当BehaviorPattern流有新数据时,更新BroadcastState
  override def processBroadcastElement(pattern: BehaviorPattern,
                                       context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#Context,
                                       collector: Collector[(Long, BehaviorPattern)]): Unit = {

    val bcPatternState: BroadcastState[Void, BehaviorPattern] = context.getBroadcastState(bcPatternDesc)
    // 将新数据更新至Broadcast State,这里使用一个null作为Key
    // 在本场景中所有数据都共享一个Pattern,因此这里伪造了一个Key
    bcPatternState.put(null, pattern)
  }

  override def processElement(userBehavior: UserBehavior,
                              context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#ReadOnlyContext,
                              collector: Collector[(Long, BehaviorPattern)]): Unit = {

    // 获取最新的Broadcast State
    val pattern: BehaviorPattern = context.getBroadcastState(bcPatternDesc).get(null)
    val lastBehavior: String = lastBehaviorState.value()
    if (pattern != null && lastBehavior != null) {
      // 用户之前有过行为,检查是否符合给定的模式
      if (pattern.firstBehavior.equals(lastBehavior) &&
          pattern.secondBehavior.equals(userBehavior.behavior))
      // 当前用户行为符合模式
      collector.collect((userBehavior.userId, pattern))
    }
    lastBehaviorState.update(userBehavior.behavior)
  }
}