Spark源码——通信环境——通信原理、通信组件
组件通信
Driver=>Executor
Executor=>Driver
Executor=>Executor
Netty:通信框架
到一个饭馆吃饭
BIO:要一份蛋炒饭,老板说前面还有十个人,那我等一会,不干其他的事
NIO:要一份蛋炒饭,老板说前面还有十个人,一个人五分钟,我等不了先告诉老板我五十分钟后来拿,我先去干别的事
AIO:不要去饭店找老板订餐了,打个电话给老板,老板说人多待会给你送过去,约定好送的地点,然后我先去干别的事(性能最好)
Linux对AIO支持不好,Windows支持较好
Linux为了模拟AIO异步操作采用Epoll操作
通信组件
SparkContext.scala
// This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf)) } // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.scala
private[spark] def createDriverEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus, numCores: Int, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains(DRIVER_HOST_ADDRESS), s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!") assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the driver!") val bindAddress = conf.get(DRIVER_BIND_ADDRESS) val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS) val port = conf.get(DRIVER_PORT) val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) { Some(CryptoStreamUtils.createKey(conf)) } else { None } create( conf, SparkContext.DRIVER_IDENTIFIER, bindAddress, advertiseAddress, Option(port), isLocal, numCores, ioEncryptionKey, listenerBus = listenerBus, mockOutputCommitCoordinator = mockOutputCommitCoordinator ) }
点击create函数
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}
点击create函数
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
占用一个端口号提供服务
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
NettyRpcEnv.scala
创建服务器
def startServer(bindAddress: String, port: Int): Unit = {
  val bootstraps: java.util.List[TransportServerBootstrap] =
    if (securityManager.isAuthenticationEnabled()) {
      java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
    } else {
      java.util.Collections.emptyList()
    }
  server = transportContext.createServer(bindAddress, port, bootstraps)
  dispatcher.registerRpcEndpoint(
    RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
点击createServer函数
public TransportServer createServer(String host, int port, Listbootstraps) { 
return new TransportServer(this, host, port, this.rpcHandler, bootstraps);
}
 
至此,Driver TransportServer 服务器已经准备好了
server = transportContext.createServer(bindAddress, port, bootstraps)
下面需要RpcEndpoint 终端
dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
RpcEndpoint作用是啥呢,点进去
RpcEndpoint.scala
中有
def receive: PartialFunction[Any, Unit] = { case _ => throw new SparkException(self + " does not implement 'receive'") }
由此可见,该终端是用来作接收的
还有
final def self: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) }
点进去看看RpcEndpointRef是做什么用的?
RpcEndpointRef.scala
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to * receive the reply within a default timeout. * * This method only sends the message once and never retries. */ def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
有很多ask函数,由此可见是用来作发送的。
接收数据的话,就有收件箱的概念
收件箱在Dispatcher.scala的
  new DedicatedMessageLoop(name, e, this)函数中
var messageLoop: MessageLoop = null try { messageLoop = endpoint match { case e: IsolatedRpcEndpoint => new DedicatedMessageLoop(name, e, this) case _ => sharedLoop.register(name, endpoint) sharedLoop } endpoints.put(name, messageLoop) } catch { case NonFatal(e) => endpointRefs.remove(endpoint) throw e }
进入该函数就可以看到inbox
private class DedicatedMessageLoop( name: String, endpoint: IsolatedRpcEndpoint, dispatcher: Dispatcher) extends MessageLoop(dispatcher) { private val inbox = new Inbox(name, endpoint)
除了收件箱,也需要发件箱
在NettyRpcEnv.scala中new出来的,而且和RpcAddress绑定,有多个发件箱
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
Driver看完了
回头再看Executor,在CoarseGrainedExecutorBackend.scala中,创建ExecutorEnv
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
进入createExecutorEnv函数
val env = create(
      conf,
      executorId,
      bindAddress,
      hostname,
      None,
      isLocal,
      numCores,
      ioEncryptionKey
    )
再进入create函数
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)
再进入环境创建函数
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}
再进入create函数
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
又出现startServer,说明跟Driver一样,TransportServer、RpcEndpoint(接收)、RpcEndpoint(发送)、收件箱、发件箱,Executor也都有
同样的进行发件的也有相应的客户端,在outbox.scala中,就创建了TransportClient对象
private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { outbox => // Give this an alias so we can use it more clearly in closures. @GuardedBy("this") private val messages = new java.util.LinkedList[OutboxMessage] @GuardedBy("this") private var client: TransportClient = null
然后客户端和Server建立连接,互相发送数据