Spark源码——通信环境——通信原理、通信组件


组件通信

Driver=>Executor

Executor=>Driver

Executor=>Executor

Netty:通信框架

到一个饭馆吃饭

BIO:要一份蛋炒饭,老板说前面还有十个人,那我等一会,不干其他的事

NIO:要一份蛋炒饭,老板说前面还有十个人,一个人五分钟,我等不了先告诉老板我五十分钟后来拿,我先去干别的事

AIO:不要去饭店找老板订餐了,打个电话给老板,老板说人多待会给你送过去,约定好送的地点,然后我先去干别的事(性能最好)

Linux对AIO支持不好,Windows支持较好

Linux为了模拟AIO异步操作采用Epoll操作

通信组件

SparkContext.scala

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus): SparkEnv = {
  SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)

SparkEnv.scala

private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    assert(conf.contains(DRIVER_HOST_ADDRESS),
      s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
    assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the driver!")
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    val port = conf.get(DRIVER_PORT)
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
      Some(CryptoStreamUtils.createKey(conf))
    } else {
      None
    }
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }

点击create函数
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)

def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}

点击create函数
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)

占用一个端口号提供服务
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
NettyRpcEnv.scala
创建服务器
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}

点击createServer函数
public TransportServer createServer(String host, int port, List bootstraps) {
return new TransportServer(this, host, port, this.rpcHandler, bootstraps);
}
 

至此,Driver TransportServer 服务器已经准备好了

server = transportContext.createServer(bindAddress, port, bootstraps)

下面需要RpcEndpoint 终端

dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))

RpcEndpoint作用是啥呢,点进去

RpcEndpoint.scala

中有

def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

由此可见,该终端是用来作接收的

还有

final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

点进去看看RpcEndpointRef是做什么用的?

RpcEndpointRef.scala

def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  /**
   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
   * receive the reply within a default timeout.
   *
   * This method only sends the message once and never retries.
   */
  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

有很多ask函数,由此可见是用来作发送的。

接收数据的话,就有收件箱的概念

收件箱在Dispatcher.scala的

  new DedicatedMessageLoop(name, e, this)函数中
var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this)
          case _ =>
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {
        case NonFatal(e) =>
          endpointRefs.remove(endpoint)
          throw e
      }

进入该函数就可以看到inbox

private class DedicatedMessageLoop(
    name: String,
    endpoint: IsolatedRpcEndpoint,
    dispatcher: Dispatcher)
  extends MessageLoop(dispatcher) {

  private val inbox = new Inbox(name, endpoint)

除了收件箱,也需要发件箱

在NettyRpcEnv.scala中new出来的,而且和RpcAddress绑定,有多个发件箱

private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

Driver看完了

回头再看Executor,在CoarseGrainedExecutorBackend.scala中,创建ExecutorEnv

val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
        arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

进入createExecutorEnv函数

val env = create(
      conf,
      executorId,
      bindAddress,
      hostname,
      None,
      isLocal,
      numCores,
      ioEncryptionKey
    )

再进入create函数

val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)

再进入环境创建函数

def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}

再进入create函数

val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }

又出现startServer,说明跟Driver一样,TransportServer、RpcEndpoint(接收)、RpcEndpoint(发送)、收件箱、发件箱,Executor也都有

同样的进行发件的也有相应的客户端,在outbox.scala中,就创建了TransportClient对象

private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {

  outbox => // Give this an alias so we can use it more clearly in closures.

  @GuardedBy("this")
  private val messages = new java.util.LinkedList[OutboxMessage]

  @GuardedBy("this")
  private var client: TransportClient = null

然后客户端和Server建立连接,互相发送数据