最近工作需要了解WebWorker,根据RTFSC原则,空下来看一下Chrome的SharedWorker源码。
ShareWorker是共用Worker,Chrome的实现调用new SharedWorker会分配独立进程,不管调用多少次都只有这一个实例。除了进程管理,SharedWorker还需要通信,Chrome中SharedWorker通过MessagePort通信。
SharedWorker.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
// 构造函数, inline SharedWorker::SharedWorker(ExecutionContext* context) // 初始化父类AbstrctWorker, // AbstractWorker继承自ActiveDOMObject,因为要在JS中使用,所以Worker需要是一个DOM Object。暂时不关注ActiveDOMObject的实现。 // AbstractWorker有一个方法,resolveURL,对传进来的url进行有效性和安全性检查。 : AbstractWorker(context) //初始化连接标志m_isBeingConnected。 , m_isBeingConnected(false) { } //接下来是SharedWorker真正创建的函数,create: PassRefPtrWillBeRawPtr<SharedWorker> SharedWorker::create(ExecutionContext* context, const String& url, const String& name, ExceptionState& exceptionState) { // SharedWorker是独立进程的 ASSERT(isMainThread()); // 因为目前还不支持worker与worker之间通信,context必须是JS中的document ASSERT_WITH_SECURITY_IMPLICATION(context->isDocument()); // 引用计数,SharedWorker会被多次引用,所以需要引用计数,以便最后一个引用退出时析构。 UseCounter::count(context, UseCounter::SharedWorkerStart); // 使用构造函数构造一个worker实例。 RefPtrWillBeRawPtr<SharedWorker> worker = adoptRefWillBeNoop(new SharedWorker(context));// SharedWorker的构造函数并没有做太多的事情,初始化父类AbstractWorker,以及一个标志,是否连接 // 通信管道,MessageChannel放到后面看。 MessageChannel* channel = MessageChannel::create(context); // port是从channel中拿到的,管道中用来通信的 worker->m_port = channel->port1(); OwnPtr<WebMessagePortChannel> remotePort = channel->port2()->disentangle(); ASSERT(remotePort); // 这个看名字就知道什么意思了 worker->suspendIfNeeded(); // 这里chrome已经注释了 // We don't currently support nested workers, so workers can only be created from documents. Document* document = toDocument(context); // 判断下能不能连上 if (!document->securityOrigin()->canAccessSharedWorkers()) { exceptionState.throwSecurityError("Access to shared workers is denied to origin '" + document->securityOrigin()->toString() + "'."); return nullptr; } // 判断url的安全性 KURL scriptURL = worker->resolveURL(url, exceptionState); if (scriptURL.isEmpty()) return nullptr; // 创建document与worker的连接 if (document->frame()->loader().client()->sharedWorkerRepositoryClient()) document->frame()->loader().client()->sharedWorkerRepositoryClient()->connect(worker.get(), remotePort.release(), scriptURL, name, exceptionState); // 返回worker的实例 return worker.release(); } SharedWorker::~SharedWorker() { } const AtomicString& SharedWorker::interfaceName() const { return EventTargetNames::SharedWorker; } bool SharedWorker::hasPendingActivity() const { return m_isBeingConnected; } |
其实SharedWorker挺简单的,进程管理,通信,下面看通信是如何实现的(急需恶补一番底层通信知识,预定下周把底层通信手段学习一遍)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
static void createChannel(MessagePort* port1, MessagePort* port2) { // 创建连接 WebMessagePortChannel* channel1; WebMessagePortChannel* channel2; Platform::current()->createMessageChannel(&channel1, &channel2); ASSERT(channel1 && channel2); // Now entangle the proxies with the appropriate local ports. port1->entangle(adoptPtr(channel2)); port2->entangle(adoptPtr(channel1)); } MessageChannel::MessageChannel(ExecutionContext* context) // MessageChannel里面就是搞两个port : m_port1(MessagePort::create(*context)) , m_port2(MessagePort::create(*context)) { createChannel(m_port1.get(), m_port2.get()); } |
好吧,这个类也不怎么干活啊,活在WebMessagePortChannel和MessagePort里面做。
MessagePort
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
// 如果这里是管道通信的话,那可能需要两组channel worker端一组,web端一组 // 这是一个干活的类,create里面做的事情不多,构造一下,返回 PassRefPtrWillBeRawPtr<MessagePort> MessagePort::create(ExecutionContext& executionContext) { RefPtrWillBeRawPtr<MessagePort> port = adoptRefWillBeNoop(new MessagePort(executionContext)); port->suspendIfNeeded(); return port.release(); } //构造函数做的也不多,创建一个跟document关联的DOM Object,初始化几个标志参数 MessagePort::MessagePort(ExecutionContext& executionContext) : ActiveDOMObject(&executionContext) , m_started(false) , m_closed(false) , m_weakFactory(this) { } MessagePort::~MessagePort() { close(); if (m_scriptStateForConversion) m_scriptStateForConversion->disposePerContextData(); } // 发消息 void MessagePort::postMessage(ExecutionContext* context, PassRefPtr<SerializedScriptValue> message, const MessagePortArray* ports, ExceptionState& exceptionState) { // entangle,好吧,英语比较渣,google翻译下,是缠的意思,Orz if (!isEntangled()) return; ASSERT(executionContext()); ASSERT(m_entangledChannel); // 一个channel数组 OwnPtr<MessagePortChannelArray> channels; // Make sure we aren't connected to any of the passed-in ports. // 防错代码 if (ports) { for (unsigned i = 0; i < ports->size(); ++i) { MessagePort* dataPort = (*ports)[i].get(); if (dataPort == this) { exceptionState.throwDOMException(DataCloneError, "Port at index " + String::number(i) + " contains the source port."); return; } } // 解绑 channels = MessagePort::disentanglePorts(context, ports, exceptionState); if (exceptionState.hadException()) return; } // 给channel发消息 WebString messageString = message->toWireString(); OwnPtr<WebMessagePortChannelArray> webChannels = toWebMessagePortChannelArray(channels.release()); // 待看 WebMessagePortChannelArray m_entangledChannel->postMessage(messageString, webChannels.leakPtr()); } // static 两组管道互相取 PassOwnPtr<WebMessagePortChannelArray> MessagePort::toWebMessagePortChannelArray(PassOwnPtr<MessagePortChannelArray> channels) { OwnPtr<WebMessagePortChannelArray> webChannels; if (channels && channels->size()) { webChannels = adoptPtr(new WebMessagePortChannelArray(channels->size())); for (size_t i = 0; i < channels->size(); ++i) (*webChannels)[i] = (*channels)[i].leakPtr(); } return webChannels.release(); } // static PassOwnPtrWillBeRawPtr<MessagePortArray> MessagePort::toMessagePortArray(ExecutionContext* context, const WebMessagePortChannelArray& webChannels) { OwnPtrWillBeRawPtr<MessagePortArray> ports = nullptr; if (!webChannels.isEmpty()) { OwnPtr<MessagePortChannelArray> channels = adoptPtr(new MessagePortChannelArray(webChannels.size())); for (size_t i = 0; i < webChannels.size(); ++i) (*channels)[i] = adoptPtr(webChannels[i]); ports = MessagePort::entanglePorts(*context, channels.release()); } return ports.release(); } // 函数字面意思是,断开缠绕关系 PassOwnPtr<WebMessagePortChannel> MessagePort::disentangle() { ASSERT(m_entangledChannel); // 将端口重置为0 m_entangledChannel->setClient(0); return m_entangledChannel.release(); } // Invoked to notify us that there are messages available for this port. // This code may be called from another thread, and so should not call any non-threadsafe APIs (i.e. should not call into the entangled channel or access mutable variables). // 通知port已经可用了,即start过了 void MessagePort::messageAvailable() { ASSERT(executionContext()); executionContext()->postTask(FROM_HERE, createCrossThreadTask(&MessagePort::dispatchMessages, m_weakFactory.createWeakPtr())); } // 类似于生命周期,start了通知一声 void MessagePort::start() { // Do nothing if we've been cloned or closed. if (!isEntangled()) return; ASSERT(executionContext()); if (m_started) return; m_started = true; messageAvailable(); } // 关掉缠绕关系 void MessagePort::close() { if (isEntangled()) m_entangledChannel->setClient(0); m_closed = true; } // 与远程channel缠绕 void MessagePort::entangle(PassOwnPtr<WebMessagePortChannel> remote) { // Only invoked to set our initial entanglement. ASSERT(!m_entangledChannel); ASSERT(executionContext()); m_entangledChannel = remote; m_entangledChannel->setClient(this); } const AtomicString& MessagePort::interfaceName() const { return EventTargetNames::MessagePort; } // 尝试从webChannel取message static bool tryGetMessageFrom(WebMessagePortChannel& webChannel, RefPtr<SerializedScriptValue>& message, OwnPtr<MessagePortChannelArray>& channels) { WebString messageString; WebMessagePortChannelArray webChannels; if (!webChannel.tryGetMessage(&messageString, webChannels)) return false; if (webChannels.size()) { channels = adoptPtr(new MessagePortChannelArray(webChannels.size())); for (size_t i = 0; i < webChannels.size(); ++i) (*channels)[i] = adoptPtr(webChannels[i]); } message = SerializedScriptValueFactory::instance().createFromWire(messageString); return true; } bool MessagePort::tryGetMessage(RefPtr<SerializedScriptValue>& message, OwnPtr<MessagePortChannelArray>& channels) { if (!m_entangledChannel) return false; return tryGetMessageFrom(*m_entangledChannel, message, channels); } // 分发消息 void MessagePort::dispatchMessages() { // Because close() doesn't cancel any in flight calls to dispatchMessages() we need to check if the port is still open before dispatch. if (m_closed) return; // Messages for contexts that are not fully active get dispatched too, but JSAbstractEventListener::handleEvent() doesn't call handlers for these. // The HTML5 spec specifies that any messages sent to a document that is not fully active should be dropped, so this behavior is OK. if (!started()) return; RefPtr<SerializedScriptValue> message; OwnPtr<MessagePortChannelArray> channels; // 从管道中拿到message while (tryGetMessage(message, channels)) { // close() in Worker onmessage handler should prevent next message from dispatching. if (executionContext()->isWorkerGlobalScope() && toWorkerGlobalScope(executionContext())->isClosing()) return; // 这里有一次绑定端口 OwnPtrWillBeRawPtr<MessagePortArray> ports = MessagePort::entanglePorts(*executionContext(), channels.release()); RefPtrWillBeRawPtr<Event> evt = MessageEvent::create(ports.release(), message.release()); dispatchEvent(evt.release(), ASSERT_NO_EXCEPTION); } } bool MessagePort::hasPendingActivity() const { // The spec says that entangled message ports should always be treated as if they have a strong reference. // We'll also stipulate that the queue needs to be open (if the app drops its reference to the port before start()-ing it, then it's not really entangled as it's unreachable). return m_started && isEntangled(); } // 解除port entangle关系? PassOwnPtr<MessagePortChannelArray> MessagePort::disentanglePorts(ExecutionContext* context, const MessagePortArray* ports, ExceptionState& exceptionState) { if (!ports || !ports->size()) return nullptr; // HashSet used to efficiently check for duplicates in the passed-in array. HashSet<MessagePort*> portSet; // Walk the incoming array - if there are any duplicate ports, or null ports or cloned ports, throw an error (per section 8.3.3 of the HTML5 spec). for (unsigned i = 0; i < ports->size(); ++i) { MessagePort* port = (*ports)[i].get(); if (!port || port->isNeutered() || portSet.contains(port)) { String type; if (!port) type = "null"; else if (port->isNeutered()) type = "already neutered"; else type = "a duplicate"; exceptionState.throwDOMException(DataCloneError, "Port at index " + String::number(i) + " is " + type + "."); return nullptr; } portSet.add(port); } UseCounter::count(context, UseCounter::MessagePortsTransferred); // Passed-in ports passed validity checks, so we can disentangle them. // 每个port disentangle OwnPtr<MessagePortChannelArray> portArray = adoptPtr(new MessagePortChannelArray(ports->size())); for (unsigned i = 0; i < ports->size(); ++i) (*portArray)[i] = (*ports)[i]->disentangle(); return portArray.release(); } // 绑定所有ports PassOwnPtrWillBeRawPtr<MessagePortArray> MessagePort::entanglePorts(ExecutionContext& context, PassOwnPtr<MessagePortChannelArray> channels) { // https://html.spec.whatwg.org/multipage/comms.html#message-ports // |ports| should be an empty array, not null even when there is no ports. if (!channels || !channels->size()) return adoptPtrWillBeNoop(new MessagePortArray()); OwnPtrWillBeRawPtr<MessagePortArray> portArray = adoptPtrWillBeNoop(new MessagePortArray(channels->size())); for (unsigned i = 0; i < channels->size(); ++i) { RefPtrWillBeRawPtr<MessagePort> port = MessagePort::create(context); port->entangle((*channels)[i].release()); (*portArray)[i] = port.release(); } return portArray.release(); } |
主要就是entangle函数里面,会做一次remote.setClient(this),然后dispatchEvent就很方便了。疑问就是 为什么postMessage会做一次disentangle
就是port看的晕晕的,还是去理解一下底层通信吧.