diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index 13a55f5..bc384d1 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -93,7 +93,7 @@ public function select($socket, $handler) { // Check for readability, then handle incoming data while ($socket->isConnected() && !$socket->eof()) { - yield 'read' => $socket; + yield 'read' => null; yield from $handler->handleData($socket) ?? []; } @@ -140,7 +140,21 @@ public function schedule($seconds, $function) { } }); return $i; - } + } + + /** + * Returns a slot to watch for a given socket + * + * @param peer.Socket|peer.BSDSocket $socket + * @param int $signal the slot to signal + * @return int + */ + private function watch($socket, $signal) { + $slot= $this->select ? array_key_last($this->select) + 1 : 1; + $this->select[$slot]= $socket; + $this->continuation[$slot]= new Continuation(function() use($signal) { yield 'signal' => $signal; }); + return $slot; + } /** * Runs service until shutdown() is called. @@ -193,11 +207,17 @@ public function service() { continue; } - // `yield 'accept' => $socket`: Check for being able to read from socket - // `yield 'read' => $_`: Continue as soon as the socket becomes readable - // `yield 'write' => $_`: Continue as soon as the socket becomes writeable - // `yield 'delay' => $millis`: Wait a specified number of milliseconds - // `yield`: Continue at the next possible execution slot (`delay => 0`) + // Internal use: + // * `yield 'accept' => $socket`: Check for being able to read from socket + // * `yield 'signal' => $n`: Finish signalling task, continue slot #n immediately + // + // Public use: + // * `yield 'read' => null`: Continue once this socket becomes readable + // * `yield 'write' => null`: Continue once this socket becomes writeable + // * `yield 'read' => $socket`: Continue as soon as the socket becomes readable + // * `yield 'write' => $socket`: Continue as soon as the socket becomes writeable + // * `yield 'delay' => $millis`: Wait a specified number of milliseconds + // * `yield`: Continue at the next possible execution slot (`delay => 0`) switch ($execute->key()) { case 'accept': $socket= $execute->current(); @@ -206,24 +226,38 @@ public function service() { $wait[]= $socket->getTimeout(); break; + case 'signal': + unset($this->tasks[$i], $this->select[$i], $this->continuation[$i], $write[$i]); + $waitable[$execute->current()]= true; + $wait[]= 0; + break; + case 'write': + if ($s= $execute->current()) $i= $this->watch($s, $i); $write[$i]= true; $writeable[$i]= $this->select[$i]; $wait[]= $this->select[$i]->getTimeout(); break; case 'read': + if ($s= $execute->current()) $i= $this->watch($s, $i); unset($write[$i]); $readable[$i]= $this->select[$i]; $wait[]= $this->select[$i]->getTimeout(); break; - case 'delay': default: + case 'delay': $delay= $execute->current() / 1000; $continuation->next= $time + $delay; $waitable[$i]= true; $wait[]= $delay; break; + + default: + $continuation->next= $time; + $waitable[$i]= true; + $wait[]= 0; + break; } }