Initial Drupal 11 with DDEV setup

This commit is contained in:
gluebox
2025-10-08 11:39:17 -04:00
commit 89ef74b305
25344 changed files with 2599172 additions and 0 deletions

25
vendor/revolt/event-loop/LICENSE vendored Normal file
View File

@ -0,0 +1,25 @@
The MIT License (MIT)
Copyright (c) 2021-2023 Revolt (Aaron Piotrowski, Cees-Jan Kiewiet, Christian Lück, Niklas Keller, and contributors)
Copyright (c) 2015-2021 amphp (Daniel Lowrey, Aaron Piotrowski, Niklas Keller, Bob Weinand, and contributors)
Copyright (c) 2012-2021 ReactPHP (Christian Lück, Cees-Jan Kiewiet, Jan Sorgalla, Chris Boden, Igor Wiedler, and contributors)
Copyright (c) 2016 PHP Asynchronous Interoperability Group
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

12
vendor/revolt/event-loop/SECURITY.md vendored Normal file
View File

@ -0,0 +1,12 @@
# Security Policy
## Supported Versions
| Version | Supported |
| ------- | ------------------ |
| 1.x | ✔️ |
| < 1.x | :x: |
## Reporting a Vulnerability
If you think you've found a vulnerability, please use the [private vulnerability reporting form provided by Github](https://docs.github.com/en/code-security/security-advisories/guidance-on-reporting-and-writing-information-about-vulnerabilities/privately-reporting-a-security-vulnerability#privately-reporting-a-security-vulnerability).

63
vendor/revolt/event-loop/composer.json vendored Normal file
View File

@ -0,0 +1,63 @@
{
"name": "revolt/event-loop",
"description": "Rock-solid event loop for concurrent PHP applications.",
"keywords": [
"async",
"asynchronous",
"concurrency",
"non-blocking",
"event",
"event-loop",
"scheduler"
],
"license": "MIT",
"authors": [
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
},
{
"name": "Cees-Jan Kiewiet",
"email": "ceesjank@gmail.com"
},
{
"name": "Christian Lück",
"email": "christian@clue.engineering"
},
{
"name": "Niklas Keller",
"email": "me@kelunik.com"
}
],
"require": {
"php": ">=8.1"
},
"require-dev": {
"ext-json": "*",
"phpunit/phpunit": "^9",
"jetbrains/phpstorm-stubs": "^2019.3",
"psalm/phar": "^5.15"
},
"autoload": {
"psr-4": {
"Revolt\\": "src"
}
},
"autoload-dev": {
"psr-4": {
"Revolt\\EventLoop\\": "test"
}
},
"support": {
"issues": "https://github.com/revoltphp/event-loop/issues"
},
"extra": {
"branch-alias": {
"dev-main": "1.x-dev"
}
},
"scripts": {
"test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit",
"code-style": "@php tools/php-cs-fixer/vendor/bin/php-cs-fixer fix"
}
}

View File

@ -0,0 +1,416 @@
<?php
declare(strict_types=1);
namespace Revolt;
use Revolt\EventLoop\CallbackType;
use Revolt\EventLoop\Driver;
use Revolt\EventLoop\DriverFactory;
use Revolt\EventLoop\Internal\AbstractDriver;
use Revolt\EventLoop\Internal\DriverCallback;
use Revolt\EventLoop\InvalidCallbackError;
use Revolt\EventLoop\Suspension;
use Revolt\EventLoop\UnsupportedFeatureException;
/**
* Accessor to allow global access to the event loop.
*
* @see Driver
*/
final class EventLoop
{
private static Driver $driver;
/**
* Sets the driver to be used as the event loop.
*/
public static function setDriver(Driver $driver): void
{
/** @psalm-suppress RedundantPropertyInitializationCheck, RedundantCondition */
if (isset(self::$driver) && self::$driver->isRunning()) {
throw new \Error("Can't swap the event loop driver while the driver is running");
}
try {
/** @psalm-suppress InternalClass */
self::$driver = new class () extends AbstractDriver {
protected function activate(array $callbacks): void
{
throw new \Error("Can't activate callback during garbage collection.");
}
protected function dispatch(bool $blocking): void
{
throw new \Error("Can't dispatch during garbage collection.");
}
protected function deactivate(DriverCallback $callback): void
{
// do nothing
}
public function getHandle(): mixed
{
return null;
}
protected function now(): float
{
return (float) \hrtime(true) / 1_000_000_000;
}
};
\gc_collect_cycles();
} finally {
self::$driver = $driver;
}
}
/**
* Queue a microtask.
*
* The queued callback MUST be executed immediately once the event loop gains control. Order of queueing MUST be
* preserved when executing the callbacks. Recursive scheduling can thus result in infinite loops, use with care.
*
* Does NOT create an event callback, thus CAN NOT be marked as disabled or unreferenced.
* Use {@see EventLoop::defer()} if you need these features.
*
* @param \Closure(...):void $closure The callback to queue.
* @param mixed ...$args The callback arguments.
*/
public static function queue(\Closure $closure, mixed ...$args): void
{
self::getDriver()->queue($closure, ...$args);
}
/**
* Defer the execution of a callback.
*
* The deferred callback MUST be executed before any other type of callback in a tick. Order of enabling MUST be
* preserved when executing the callbacks.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Deferred callbacks MUST NOT be called in the tick they were enabled.
*
* @param \Closure(string):void $closure The callback to defer. The `$callbackId` will be
* invalidated before the callback invocation.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public static function defer(\Closure $closure): string
{
return self::getDriver()->defer($closure);
}
/**
* Delay the execution of a callback.
*
* The delay is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be determined by which
* timers expire first, but timers with the same expiration time MAY be executed in any order.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param float $delay The amount of time, in seconds, to delay the execution for.
* @param \Closure(string):void $closure The callback to delay. The `$callbackId` will be invalidated
* before the callback invocation.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public static function delay(float $delay, \Closure $closure): string
{
return self::getDriver()->delay($delay, $closure);
}
/**
* Repeatedly execute a callback.
*
* The interval between executions is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be
* determined by which timers expire first, but timers with the same expiration time MAY be executed in any order.
* The first execution is scheduled after the first interval period.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param float $interval The time interval, in seconds, to wait between executions.
* @param \Closure(string):void $closure The callback to repeat.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public static function repeat(float $interval, \Closure $closure): string
{
return self::getDriver()->repeat($interval, $closure);
}
/**
* Execute a callback when a stream resource becomes readable or is closed for reading.
*
* Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the
* callback when closing the resource locally. Drivers MAY choose to notify the user if there are callbacks on
* invalid resources, but are not required to, due to the high performance impact. Callbacks on closed resources are
* therefore undefined behavior.
*
* Multiple callbacks on the same stream MAY be executed in any order.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param resource $stream The stream to monitor.
* @param \Closure(string, resource):void $closure The callback to execute.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public static function onReadable(mixed $stream, \Closure $closure): string
{
return self::getDriver()->onReadable($stream, $closure);
}
/**
* Execute a callback when a stream resource becomes writable or is closed for writing.
*
* Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the
* callback when closing the resource locally. Drivers MAY choose to notify the user if there are callbacks on
* invalid resources, but are not required to, due to the high performance impact. Callbacks on closed resources are
* therefore undefined behavior.
*
* Multiple callbacks on the same stream MAY be executed in any order.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param resource $stream The stream to monitor.
* @param \Closure(string, resource):void $closure The callback to execute.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public static function onWritable(mixed $stream, \Closure $closure): string
{
return self::getDriver()->onWritable($stream, $closure);
}
/**
* Execute a callback when a signal is received.
*
* Warning: Installing the same signal on different instances of this interface is deemed undefined behavior.
* Implementations MAY try to detect this, if possible, but are not required to. This is due to technical
* limitations of the signals being registered globally per process.
*
* Multiple callbacks on the same signal MAY be executed in any order.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param int $signal The signal number to monitor.
* @param \Closure(string, int):void $closure The callback to execute.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*
* @throws UnsupportedFeatureException If signal handling is not supported.
*/
public static function onSignal(int $signal, \Closure $closure): string
{
return self::getDriver()->onSignal($signal, $closure);
}
/**
* Enable a callback to be active starting in the next tick.
*
* Callbacks MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right
* before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param string $callbackId The callback identifier.
*
* @return string The callback identifier.
*
* @throws InvalidCallbackError If the callback identifier is invalid.
*/
public static function enable(string $callbackId): string
{
return self::getDriver()->enable($callbackId);
}
/**
* Disable a callback immediately.
*
* A callback MUST be disabled immediately, e.g. if a deferred callback disables another deferred callback,
* the second deferred callback isn't executed in this tick.
*
* Disabling a callback MUST NOT invalidate the callback. Calling this function MUST NOT fail, even if passed an
* invalid callback identifier.
*
* @param string $callbackId The callback identifier.
*
* @return string The callback identifier.
*/
public static function disable(string $callbackId): string
{
return self::getDriver()->disable($callbackId);
}
/**
* Cancel a callback.
*
* This will detach the event loop from all resources that are associated to the callback. After this operation the
* callback is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid identifier.
*
* @param string $callbackId The callback identifier.
*/
public static function cancel(string $callbackId): void
{
self::getDriver()->cancel($callbackId);
}
/**
* Reference a callback.
*
* This will keep the event loop alive whilst the event is still being monitored. Callbacks have this state by
* default.
*
* @param string $callbackId The callback identifier.
*
* @return string The callback identifier.
*
* @throws InvalidCallbackError If the callback identifier is invalid.
*/
public static function reference(string $callbackId): string
{
return self::getDriver()->reference($callbackId);
}
/**
* Unreference a callback.
*
* The event loop should exit the run method when only unreferenced callbacks are still being monitored. Callbacks
* are all referenced by default.
*
* @param string $callbackId The callback identifier.
*
* @return string The callback identifier.
*/
public static function unreference(string $callbackId): string
{
return self::getDriver()->unreference($callbackId);
}
/**
* Set a callback to be executed when an error occurs.
*
* The callback receives the error as the first and only parameter. The return value of the callback gets ignored.
* If it can't handle the error, it MUST throw the error. Errors thrown by the callback or during its invocation
* MUST be thrown into the `run` loop and stop the driver.
*
* Subsequent calls to this method will overwrite the previous handler.
*
* @param null|\Closure(\Throwable):void $errorHandler The callback to execute. `null` will clear the current handler.
*/
public static function setErrorHandler(?\Closure $errorHandler): void
{
self::getDriver()->setErrorHandler($errorHandler);
}
/**
* Gets the error handler closure or {@code null} if none is set.
*
* @return null|\Closure(\Throwable):void The previous handler, `null` if there was none.
*/
public static function getErrorHandler(): ?\Closure
{
return self::getDriver()->getErrorHandler();
}
/**
* Returns all registered non-cancelled callback identifiers.
*
* @return string[] Callback identifiers.
*/
public static function getIdentifiers(): array
{
return self::getDriver()->getIdentifiers();
}
/**
* Returns the type of the callback identified by the given callback identifier.
*
* @param string $callbackId The callback identifier.
*
* @return CallbackType The callback type.
*/
public static function getType(string $callbackId): CallbackType
{
return self::getDriver()->getType($callbackId);
}
/**
* Returns whether the callback identified by the given callback identifier is currently enabled.
*
* @param string $callbackId The callback identifier.
*
* @return bool `true` if the callback is currently enabled, otherwise `false`.
*/
public static function isEnabled(string $callbackId): bool
{
return self::getDriver()->isEnabled($callbackId);
}
/**
* Returns whether the callback identified by the given callback identifier is currently referenced.
*
* @param string $callbackId The callback identifier.
*
* @return bool `true` if the callback is currently referenced, otherwise `false`.
*/
public static function isReferenced(string $callbackId): bool
{
return self::getDriver()->isReferenced($callbackId);
}
/**
* Retrieve the event loop driver that is in scope.
*
* @return Driver
*/
public static function getDriver(): Driver
{
/** @psalm-suppress RedundantPropertyInitializationCheck, RedundantCondition */
if (!isset(self::$driver)) {
self::setDriver((new DriverFactory())->create());
}
return self::$driver;
}
/**
* Returns an object used to suspend and resume execution of the current fiber or {main}.
*
* Calls from the same fiber will return the same suspension object.
*
* @return Suspension
*/
public static function getSuspension(): Suspension
{
return self::getDriver()->getSuspension();
}
/**
* Run the event loop.
*
* This function may only be called from {main}, that is, not within a fiber.
*
* Libraries should use the {@link Suspension} API instead of calling this method.
*
* This method will not return until the event loop does not contain any pending, referenced callbacks anymore.
*/
public static function run(): void
{
self::getDriver()->run();
}
/**
* Disable construction as this is a static class.
*/
private function __construct()
{
// intentionally left blank
}
}

View File

@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop;
enum CallbackType
{
case Defer;
case Delay;
case Repeat;
case Readable;
case Writable;
case Signal;
}

View File

@ -0,0 +1,318 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop;
/**
* The driver MUST run in its own fiber and execute callbacks in a separate fiber. If fibers are reused, the driver
* needs to call {@see FiberLocal::clear()} after running the callback.
*/
interface Driver
{
/**
* Run the event loop.
*
* One iteration of the loop is called one "tick". A tick covers the following steps:
*
* 1. Activate callbacks created / enabled in the last tick / before `run()`.
* 2. Execute all enabled deferred callbacks.
* 3. Execute all due timer, pending signal and actionable stream callbacks, each only once per tick.
*
* The loop MUST continue to run until it is either stopped explicitly, no referenced callbacks exist anymore, or an
* exception is thrown that cannot be handled. Exceptions that cannot be handled are exceptions thrown from an
* error handler or exceptions that would be passed to an error handler but none exists to handle them.
*
* @throws \Error Thrown if the event loop is already running.
*/
public function run(): void;
/**
* Stop the event loop.
*
* When an event loop is stopped, it continues with its current tick and exits the loop afterwards. Multiple calls
* to stop MUST be ignored and MUST NOT raise an exception.
*/
public function stop(): void;
/**
* Returns an object used to suspend and resume execution of the current fiber or {main}.
*
* Calls from the same fiber will return the same suspension object.
*
* @return Suspension
*/
public function getSuspension(): Suspension;
/**
* @return bool True if the event loop is running, false if it is stopped.
*/
public function isRunning(): bool;
/**
* Queue a microtask.
*
* The queued callback MUST be executed immediately once the event loop gains control. Order of queueing MUST be
* preserved when executing the callbacks. Recursive scheduling can thus result in infinite loops, use with care.
*
* Does NOT create an event callback, thus CAN NOT be marked as disabled or unreferenced.
* Use {@see EventLoop::defer()} if you need these features.
*
* @param \Closure(...):void $closure The callback to queue.
* @param mixed ...$args The callback arguments.
*/
public function queue(\Closure $closure, mixed ...$args): void;
/**
* Defer the execution of a callback.
*
* The deferred callback MUST be executed before any other type of callback in a tick. Order of enabling MUST be
* preserved when executing the callbacks.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param \Closure(string):void $closure The callback to defer. The `$callbackId` will be invalidated before the
* callback invocation.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public function defer(\Closure $closure): string;
/**
* Delay the execution of a callback.
*
* The delay is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be determined by which
* timers expire first, but timers with the same expiration time MAY be executed in any order.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param float $delay The amount of time, in seconds, to delay the execution for.
* @param \Closure(string):void $closure The callback to delay. The `$callbackId` will be invalidated before the
* callback invocation.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public function delay(float $delay, \Closure $closure): string;
/**
* Repeatedly execute a callback.
*
* The interval between executions is a minimum and approximate, accuracy is not guaranteed. Order of calls MUST be
* determined by which timers expire first, but timers with the same expiration time MAY be executed in any order.
* The first execution is scheduled after the first interval period.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param float $interval The time interval, in seconds, to wait between executions.
* @param \Closure(string):void $closure The callback to repeat.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public function repeat(float $interval, \Closure $closure): string;
/**
* Execute a callback when a stream resource becomes readable or is closed for reading.
*
* Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the
* callback when closing the resource locally. Drivers MAY choose to notify the user if there are callbacks on
* invalid resources, but are not required to, due to the high performance impact. Callbacks on closed resources are
* therefore undefined behavior.
*
* Multiple callbacks on the same stream MAY be executed in any order.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param resource $stream The stream to monitor.
* @param \Closure(string, resource):void $closure The callback to execute.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public function onReadable(mixed $stream, \Closure $closure): string;
/**
* Execute a callback when a stream resource becomes writable or is closed for writing.
*
* Warning: Closing resources locally, e.g. with `fclose`, might not invoke the callback. Be sure to `cancel` the
* callback when closing the resource locally. Drivers MAY choose to notify the user if there are callbacks on
* invalid resources, but are not required to, due to the high performance impact. Callbacks on closed resources are
* therefore undefined behavior.
*
* Multiple callbacks on the same stream MAY be executed in any order.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param resource $stream The stream to monitor.
* @param \Closure(string, resource):void $closure The callback to execute.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public function onWritable(mixed $stream, \Closure $closure): string;
/**
* Execute a callback when a signal is received.
*
* Warning: Installing the same signal on different instances of this interface is deemed undefined behavior.
* Implementations MAY try to detect this, if possible, but are not required to. This is due to technical
* limitations of the signals being registered globally per process.
*
* Multiple callbacks on the same signal MAY be executed in any order.
*
* The created callback MUST immediately be marked as enabled, but only be activated (i.e. callback can be called)
* right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param int $signal The signal number to monitor.
* @param \Closure(string, int):void $closure The callback to execute.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
*
* @throws UnsupportedFeatureException If signal handling is not supported.
*/
public function onSignal(int $signal, \Closure $closure): string;
/**
* Enable a callback to be active starting in the next tick.
*
* Callbacks MUST immediately be marked as enabled, but only be activated (i.e. callbacks can be called) right
* before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @param string $callbackId The callback identifier.
*
* @return string The callback identifier.
*
* @throws InvalidCallbackError If the callback identifier is invalid.
*/
public function enable(string $callbackId): string;
/**
* Cancel a callback.
*
* This will detach the event loop from all resources that are associated to the callback. After this operation the
* callback is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid identifier.
*
* @param string $callbackId The callback identifier.
*/
public function cancel(string $callbackId): void;
/**
* Disable a callback immediately.
*
* A callback MUST be disabled immediately, e.g. if a deferred callback disables a later deferred callback,
* the second deferred callback isn't executed in this tick.
*
* Disabling a callback MUST NOT invalidate the callback. Calling this function MUST NOT fail, even if passed an
* invalid callback identifier.
*
* @param string $callbackId The callback identifier.
*
* @return string The callback identifier.
*/
public function disable(string $callbackId): string;
/**
* Reference a callback.
*
* This will keep the event loop alive whilst the callback is still being monitored. Callbacks have this state by
* default.
*
* @param string $callbackId The callback identifier.
*
* @return string The callback identifier.
*
* @throws InvalidCallbackError If the callback identifier is invalid.
*/
public function reference(string $callbackId): string;
/**
* Unreference a callback.
*
* The event loop should exit the run method when only unreferenced callbacks are still being monitored. Callbacks
* are all referenced by default.
*
* @param string $callbackId The callback identifier.
*
* @return string The callback identifier.
*/
public function unreference(string $callbackId): string;
/**
* Set a callback to be executed when an error occurs.
*
* The callback receives the error as the first and only parameter. The return value of the callback gets ignored.
* If it can't handle the error, it MUST throw the error. Errors thrown by the callback or during its invocation
* MUST be thrown into the `run` loop and stop the driver.
*
* Subsequent calls to this method will overwrite the previous handler.
*
* @param null|\Closure(\Throwable):void $errorHandler The callback to execute. `null` will clear the current
* handler.
*/
public function setErrorHandler(?\Closure $errorHandler): void;
/**
* Gets the error handler closure or {@code null} if none is set.
*
* @return null|\Closure(\Throwable):void The previous handler, `null` if there was none.
*/
public function getErrorHandler(): ?\Closure;
/**
* Get the underlying loop handle.
*
* Example: the `uv_loop` resource for `libuv` or the `EvLoop` object for `libev` or `null` for a stream_select
* driver.
*
* Note: This function is *not* exposed in the `Loop` class. Users shall access it directly on the respective loop
* instance.
*
* @return null|object|resource The loop handle the event loop operates on. `null` if there is none.
*/
public function getHandle(): mixed;
/**
* Returns all registered non-cancelled callback identifiers.
*
* @return string[] Callback identifiers.
*/
public function getIdentifiers(): array;
/**
* Returns the type of the callback identified by the given callback identifier.
*
* @param string $callbackId The callback identifier.
*
* @return CallbackType The callback type.
*/
public function getType(string $callbackId): CallbackType;
/**
* Returns whether the callback identified by the given callback identifier is currently enabled.
*
* @param string $callbackId The callback identifier.
*
* @return bool `true` if the callback is currently enabled, otherwise `false`.
*/
public function isEnabled(string $callbackId): bool;
/**
* Returns whether the callback identified by the given callback identifier is currently referenced.
*
* @param string $callbackId The callback identifier.
*
* @return bool `true` if the callback is currently referenced, otherwise `false`.
*/
public function isReferenced(string $callbackId): bool;
/**
* Returns some useful information about the event loop.
*
* If this method isn't implemented, dumping the event loop in a busy application, even indirectly, is a pain.
*
* @return array
*/
public function __debugInfo(): array;
}

View File

@ -0,0 +1,219 @@
<?php
declare(strict_types=1);
/** @noinspection PhpComposerExtensionStubsInspection */
namespace Revolt\EventLoop\Driver;
use Revolt\EventLoop\Internal\AbstractDriver;
use Revolt\EventLoop\Internal\DriverCallback;
use Revolt\EventLoop\Internal\SignalCallback;
use Revolt\EventLoop\Internal\StreamCallback;
use Revolt\EventLoop\Internal\StreamReadableCallback;
use Revolt\EventLoop\Internal\StreamWritableCallback;
use Revolt\EventLoop\Internal\TimerCallback;
final class EvDriver extends AbstractDriver
{
/** @var array<string, \EvSignal>|null */
private static ?array $activeSignals = null;
public static function isSupported(): bool
{
return \extension_loaded("ev");
}
private \EvLoop $handle;
/** @var array<string, \EvWatcher> */
private array $events = [];
private readonly \Closure $ioCallback;
private readonly \Closure $timerCallback;
private readonly \Closure $signalCallback;
/** @var array<string, \EvSignal> */
private array $signals = [];
public function __construct()
{
parent::__construct();
$this->handle = new \EvLoop();
if (self::$activeSignals === null) {
self::$activeSignals = &$this->signals;
}
$this->ioCallback = function (\EvIo $event): void {
/** @var StreamCallback $callback */
$callback = $event->data;
$this->enqueueCallback($callback);
};
$this->timerCallback = function (\EvTimer $event): void {
/** @var TimerCallback $callback */
$callback = $event->data;
$this->enqueueCallback($callback);
};
$this->signalCallback = function (\EvSignal $event): void {
/** @var SignalCallback $callback */
$callback = $event->data;
$this->enqueueCallback($callback);
};
}
/**
* {@inheritdoc}
*/
public function cancel(string $callbackId): void
{
parent::cancel($callbackId);
unset($this->events[$callbackId]);
}
public function __destruct()
{
foreach ($this->events as $event) {
/** @psalm-suppress all */
if ($event !== null) { // Events may have been nulled in extension depending on destruct order.
$event->stop();
}
}
// We need to clear all references to events manually, see
// https://bitbucket.org/osmanov/pecl-ev/issues/31/segfault-in-ev_timer_stop
$this->events = [];
}
/**
* {@inheritdoc}
*/
public function run(): void
{
$active = self::$activeSignals;
\assert($active !== null);
foreach ($active as $event) {
$event->stop();
}
self::$activeSignals = &$this->signals;
foreach ($this->signals as $event) {
$event->start();
}
try {
parent::run();
} finally {
foreach ($this->signals as $event) {
$event->stop();
}
self::$activeSignals = &$active;
foreach ($active as $event) {
$event->start();
}
}
}
/**
* {@inheritdoc}
*/
public function stop(): void
{
$this->handle->stop();
parent::stop();
}
/**
* {@inheritdoc}
*/
public function getHandle(): \EvLoop
{
return $this->handle;
}
protected function now(): float
{
return (float) \hrtime(true) / 1_000_000_000;
}
/**
* {@inheritdoc}
*/
protected function dispatch(bool $blocking): void
{
$this->handle->run($blocking ? \Ev::RUN_ONCE : \Ev::RUN_ONCE | \Ev::RUN_NOWAIT);
}
/**
* {@inheritdoc}
*/
protected function activate(array $callbacks): void
{
$this->handle->nowUpdate();
$now = $this->now();
foreach ($callbacks as $callback) {
if (!isset($this->events[$id = $callback->id])) {
if ($callback instanceof StreamReadableCallback) {
\assert(\is_resource($callback->stream));
$this->events[$id] = $this->handle->io($callback->stream, \Ev::READ, $this->ioCallback, $callback);
} elseif ($callback instanceof StreamWritableCallback) {
\assert(\is_resource($callback->stream));
$this->events[$id] = $this->handle->io(
$callback->stream,
\Ev::WRITE,
$this->ioCallback,
$callback
);
} elseif ($callback instanceof TimerCallback) {
$interval = $callback->interval;
$this->events[$id] = $this->handle->timer(
\max(0, ($callback->expiration - $now)),
$callback->repeat ? $interval : 0,
$this->timerCallback,
$callback
);
} elseif ($callback instanceof SignalCallback) {
$this->events[$id] = $this->handle->signal($callback->signal, $this->signalCallback, $callback);
} else {
// @codeCoverageIgnoreStart
throw new \Error("Unknown callback type: " . \get_class($callback));
// @codeCoverageIgnoreEnd
}
} else {
$this->events[$id]->start();
}
if ($callback instanceof SignalCallback) {
/** @psalm-suppress PropertyTypeCoercion */
$this->signals[$id] = $this->events[$id];
}
}
}
protected function deactivate(DriverCallback $callback): void
{
if (isset($this->events[$id = $callback->id])) {
$this->events[$id]->stop();
if ($callback instanceof SignalCallback) {
unset($this->signals[$id]);
}
}
}
}

View File

@ -0,0 +1,243 @@
<?php
declare(strict_types=1);
/** @noinspection PhpComposerExtensionStubsInspection */
namespace Revolt\EventLoop\Driver;
use Revolt\EventLoop\Internal\AbstractDriver;
use Revolt\EventLoop\Internal\DriverCallback;
use Revolt\EventLoop\Internal\SignalCallback;
use Revolt\EventLoop\Internal\StreamCallback;
use Revolt\EventLoop\Internal\StreamReadableCallback;
use Revolt\EventLoop\Internal\StreamWritableCallback;
use Revolt\EventLoop\Internal\TimerCallback;
final class EventDriver extends AbstractDriver
{
/** @var array<string, \Event>|null */
private static ?array $activeSignals = null;
public static function isSupported(): bool
{
return \extension_loaded("event");
}
private \EventBase $handle;
/** @var array<string, \Event> */
private array $events = [];
private readonly \Closure $ioCallback;
private readonly \Closure $timerCallback;
private readonly \Closure $signalCallback;
/** @var array<string, \Event> */
private array $signals = [];
public function __construct()
{
parent::__construct();
/** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */
$this->handle = new \EventBase();
if (self::$activeSignals === null) {
self::$activeSignals = &$this->signals;
}
$this->ioCallback = function ($resource, $what, StreamCallback $callback): void {
$this->enqueueCallback($callback);
};
$this->timerCallback = function ($resource, $what, TimerCallback $callback): void {
$this->enqueueCallback($callback);
};
$this->signalCallback = function ($signo, $what, SignalCallback $callback): void {
$this->enqueueCallback($callback);
};
}
/**
* {@inheritdoc}
*/
public function cancel(string $callbackId): void
{
parent::cancel($callbackId);
if (isset($this->events[$callbackId])) {
$this->events[$callbackId]->free();
unset($this->events[$callbackId]);
}
}
/**
* @codeCoverageIgnore
*/
public function __destruct()
{
foreach ($this->events as $event) {
if ($event !== null) { // Events may have been nulled in extension depending on destruct order.
$event->free();
}
}
// Unset here, otherwise $event->del() fails with a warning, because __destruct order isn't defined.
// See https://github.com/amphp/amp/issues/159.
$this->events = [];
// Manually free the loop handle to fully release loop resources.
// See https://github.com/amphp/amp/issues/177.
/** @psalm-suppress RedundantPropertyInitializationCheck */
if (isset($this->handle)) {
$this->handle->free();
unset($this->handle);
}
}
/**
* {@inheritdoc}
*/
public function run(): void
{
$active = self::$activeSignals;
\assert($active !== null);
foreach ($active as $event) {
$event->del();
}
self::$activeSignals = &$this->signals;
foreach ($this->signals as $event) {
/** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */
$event->add();
}
try {
parent::run();
} finally {
foreach ($this->signals as $event) {
$event->del();
}
self::$activeSignals = &$active;
foreach ($active as $event) {
/** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */
$event->add();
}
}
}
/**
* {@inheritdoc}
*/
public function stop(): void
{
$this->handle->stop();
parent::stop();
}
/**
* {@inheritdoc}
*/
public function getHandle(): \EventBase
{
return $this->handle;
}
protected function now(): float
{
return (float) \hrtime(true) / 1_000_000_000;
}
/**
* {@inheritdoc}
*/
protected function dispatch(bool $blocking): void
{
$this->handle->loop($blocking ? \EventBase::LOOP_ONCE : \EventBase::LOOP_ONCE | \EventBase::LOOP_NONBLOCK);
}
/**
* {@inheritdoc}
*/
protected function activate(array $callbacks): void
{
$now = $this->now();
foreach ($callbacks as $callback) {
if (!isset($this->events[$id = $callback->id])) {
if ($callback instanceof StreamReadableCallback) {
\assert(\is_resource($callback->stream));
$this->events[$id] = new \Event(
$this->handle,
$callback->stream,
\Event::READ | \Event::PERSIST,
$this->ioCallback,
$callback
);
} elseif ($callback instanceof StreamWritableCallback) {
\assert(\is_resource($callback->stream));
$this->events[$id] = new \Event(
$this->handle,
$callback->stream,
\Event::WRITE | \Event::PERSIST,
$this->ioCallback,
$callback
);
} elseif ($callback instanceof TimerCallback) {
$this->events[$id] = new \Event(
$this->handle,
-1,
\Event::TIMEOUT,
$this->timerCallback,
$callback
);
} elseif ($callback instanceof SignalCallback) {
$this->events[$id] = new \Event(
$this->handle,
$callback->signal,
\Event::SIGNAL | \Event::PERSIST,
$this->signalCallback,
$callback
);
} else {
// @codeCoverageIgnoreStart
throw new \Error("Unknown callback type");
// @codeCoverageIgnoreEnd
}
}
if ($callback instanceof TimerCallback) {
$interval = \min(\max(0, $callback->expiration - $now), \PHP_INT_MAX / 2);
$this->events[$id]->add($interval > 0 ? $interval : 0);
} elseif ($callback instanceof SignalCallback) {
$this->signals[$id] = $this->events[$id];
/** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */
$this->events[$id]->add();
} else {
/** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */
$this->events[$id]->add();
}
}
}
/**
* {@inheritdoc}
*/
protected function deactivate(DriverCallback $callback): void
{
if (isset($this->events[$id = $callback->id])) {
$this->events[$id]->del();
if ($callback instanceof SignalCallback) {
unset($this->signals[$id]);
}
}
}
}

View File

@ -0,0 +1,333 @@
<?php
declare(strict_types=1);
/** @noinspection PhpComposerExtensionStubsInspection */
namespace Revolt\EventLoop\Driver;
use Revolt\EventLoop\Internal\AbstractDriver;
use Revolt\EventLoop\Internal\DriverCallback;
use Revolt\EventLoop\Internal\SignalCallback;
use Revolt\EventLoop\Internal\StreamReadableCallback;
use Revolt\EventLoop\Internal\StreamWritableCallback;
use Revolt\EventLoop\Internal\TimerCallback;
use Revolt\EventLoop\Internal\TimerQueue;
use Revolt\EventLoop\UnsupportedFeatureException;
final class StreamSelectDriver extends AbstractDriver
{
/** @var array<int, resource> */
private array $readStreams = [];
/** @var array<int, array<string, StreamReadableCallback>> */
private array $readCallbacks = [];
/** @var array<int, resource> */
private array $writeStreams = [];
/** @var array<int, array<string, StreamWritableCallback>> */
private array $writeCallbacks = [];
private readonly TimerQueue $timerQueue;
/** @var array<int, array<string, SignalCallback>> */
private array $signalCallbacks = [];
/** @var \SplQueue<int> */
private readonly \SplQueue $signalQueue;
private bool $signalHandling;
private readonly \Closure $streamSelectErrorHandler;
private bool $streamSelectIgnoreResult = false;
public function __construct()
{
parent::__construct();
$this->signalQueue = new \SplQueue();
$this->timerQueue = new TimerQueue();
$this->signalHandling = \extension_loaded("pcntl")
&& \function_exists('pcntl_signal_dispatch')
&& \function_exists('pcntl_signal');
$this->streamSelectErrorHandler = function (int $errno, string $message): void {
// Casing changed in PHP 8 from 'unable' to 'Unable'
if (\stripos($message, "stream_select(): unable to select [4]: ") === 0) { // EINTR
$this->streamSelectIgnoreResult = true;
return;
}
if (\str_contains($message, 'FD_SETSIZE')) {
$message = \str_replace(["\r\n", "\n", "\r"], " ", $message);
$pattern = '(stream_select\(\): You MUST recompile PHP with a larger value of FD_SETSIZE. It is set to (\d+), but you have descriptors numbered at least as high as (\d+)\.)';
if (\preg_match($pattern, $message, $match)) {
$helpLink = 'https://revolt.run/extensions';
$message = 'You have reached the limits of stream_select(). It has a FD_SETSIZE of ' . $match[1]
. ', but you have file descriptors numbered at least as high as ' . $match[2] . '. '
. "You can install one of the extensions listed on {$helpLink} to support a higher number of "
. "concurrent file descriptors. If a large number of open file descriptors is unexpected, you "
. "might be leaking file descriptors that aren't closed correctly.";
}
}
throw new \Exception($message, $errno);
};
}
public function __destruct()
{
foreach ($this->signalCallbacks as $signalCallbacks) {
foreach ($signalCallbacks as $signalCallback) {
$this->deactivate($signalCallback);
}
}
}
/**
* @throws UnsupportedFeatureException If the pcntl extension is not available.
*/
public function onSignal(int $signal, \Closure $closure): string
{
if (!$this->signalHandling) {
throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
}
return parent::onSignal($signal, $closure);
}
public function getHandle(): mixed
{
return null;
}
protected function now(): float
{
return (float) \hrtime(true) / 1_000_000_000;
}
/**
* @throws \Throwable
*/
protected function dispatch(bool $blocking): void
{
if ($this->signalHandling) {
\pcntl_signal_dispatch();
while (!$this->signalQueue->isEmpty()) {
$signal = $this->signalQueue->dequeue();
foreach ($this->signalCallbacks[$signal] as $callback) {
$this->enqueueCallback($callback);
}
$blocking = false;
}
}
$this->selectStreams(
$this->readStreams,
$this->writeStreams,
$blocking ? $this->getTimeout() : 0.0
);
$now = $this->now();
while ($callback = $this->timerQueue->extract($now)) {
$this->enqueueCallback($callback);
}
}
protected function activate(array $callbacks): void
{
foreach ($callbacks as $callback) {
if ($callback instanceof StreamReadableCallback) {
\assert(\is_resource($callback->stream));
$streamId = (int) $callback->stream;
$this->readCallbacks[$streamId][$callback->id] = $callback;
$this->readStreams[$streamId] = $callback->stream;
} elseif ($callback instanceof StreamWritableCallback) {
\assert(\is_resource($callback->stream));
$streamId = (int) $callback->stream;
$this->writeCallbacks[$streamId][$callback->id] = $callback;
$this->writeStreams[$streamId] = $callback->stream;
} elseif ($callback instanceof TimerCallback) {
$this->timerQueue->insert($callback);
} elseif ($callback instanceof SignalCallback) {
if (!isset($this->signalCallbacks[$callback->signal])) {
\set_error_handler(static function (int $errno, string $errstr): bool {
throw new UnsupportedFeatureException(
\sprintf("Failed to register signal handler; Errno: %d; %s", $errno, $errstr)
);
});
// Avoid bug in Psalm handling of first-class callables by assigning to a temp variable.
$handler = $this->handleSignal(...);
try {
\pcntl_signal($callback->signal, $handler);
} finally {
\restore_error_handler();
}
}
$this->signalCallbacks[$callback->signal][$callback->id] = $callback;
} else {
// @codeCoverageIgnoreStart
throw new \Error("Unknown callback type");
// @codeCoverageIgnoreEnd
}
}
}
protected function deactivate(DriverCallback $callback): void
{
if ($callback instanceof StreamReadableCallback) {
$streamId = (int) $callback->stream;
unset($this->readCallbacks[$streamId][$callback->id]);
if (empty($this->readCallbacks[$streamId])) {
unset($this->readCallbacks[$streamId], $this->readStreams[$streamId]);
}
} elseif ($callback instanceof StreamWritableCallback) {
$streamId = (int) $callback->stream;
unset($this->writeCallbacks[$streamId][$callback->id]);
if (empty($this->writeCallbacks[$streamId])) {
unset($this->writeCallbacks[$streamId], $this->writeStreams[$streamId]);
}
} elseif ($callback instanceof TimerCallback) {
$this->timerQueue->remove($callback);
} elseif ($callback instanceof SignalCallback) {
if (isset($this->signalCallbacks[$callback->signal])) {
unset($this->signalCallbacks[$callback->signal][$callback->id]);
if (empty($this->signalCallbacks[$callback->signal])) {
unset($this->signalCallbacks[$callback->signal]);
\set_error_handler(static fn () => true);
try {
\pcntl_signal($callback->signal, \SIG_DFL);
} finally {
\restore_error_handler();
}
}
}
} else {
// @codeCoverageIgnoreStart
throw new \Error("Unknown callback type");
// @codeCoverageIgnoreEnd
}
}
/**
* @param array<int, resource> $read
* @param array<int, resource> $write
*/
private function selectStreams(array $read, array $write, float $timeout): void
{
if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop.
if ($timeout >= 0) {
$seconds = (int) $timeout;
$microseconds = (int) (($timeout - $seconds) * 1_000_000);
} else {
$seconds = null;
$microseconds = null;
}
// Failed connection attempts are indicated via except on Windows
// @link https://github.com/reactphp/event-loop/blob/8bd064ce23c26c4decf186c2a5a818c9a8209eb0/src/StreamSelectLoop.php#L279-L287
// @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
$except = null;
if (\DIRECTORY_SEPARATOR === '\\') {
$except = $write;
}
\set_error_handler($this->streamSelectErrorHandler);
try {
/** @psalm-suppress InvalidArgument */
$result = \stream_select($read, $write, $except, $seconds, $microseconds);
} finally {
\restore_error_handler();
}
if ($this->streamSelectIgnoreResult || $result === 0) {
$this->streamSelectIgnoreResult = false;
return;
}
if (!$result) {
throw new \Exception('Unknown error during stream_select');
}
foreach ($read as $stream) {
$streamId = (int) $stream;
if (!isset($this->readCallbacks[$streamId])) {
continue; // All read callbacks disabled.
}
foreach ($this->readCallbacks[$streamId] as $callback) {
$this->enqueueCallback($callback);
}
}
/** @var array<int, resource>|null $except */
if ($except !== null) {
foreach ($except as $key => $socket) {
$write[$key] = $socket;
}
}
foreach ($write as $stream) {
$streamId = (int) $stream;
if (!isset($this->writeCallbacks[$streamId])) {
continue; // All write callbacks disabled.
}
foreach ($this->writeCallbacks[$streamId] as $callback) {
$this->enqueueCallback($callback);
}
}
return;
}
if ($timeout < 0) { // Only signal callbacks are enabled, so sleep indefinitely.
/** @psalm-suppress ArgumentTypeCoercion */
\usleep(\PHP_INT_MAX);
return;
}
if ($timeout > 0) { // Sleep until next timer expires.
/** @psalm-suppress ArgumentTypeCoercion $timeout is positive here. */
\usleep((int) ($timeout * 1_000_000));
}
}
/**
* @return float Seconds until next timer expires or -1 if there are no pending timers.
*/
private function getTimeout(): float
{
$expiration = $this->timerQueue->peek();
if ($expiration === null) {
return -1;
}
$expiration -= $this->now();
return $expiration > 0 ? $expiration : 0.0;
}
private function handleSignal(int $signal): void
{
// Queue signals, so we don't suspend inside pcntl_signal_dispatch, which disables signals while it runs
$this->signalQueue->enqueue($signal);
}
}

View File

@ -0,0 +1,280 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Driver;
use Revolt\EventLoop\CallbackType;
use Revolt\EventLoop\Driver;
use Revolt\EventLoop\InvalidCallbackError;
use Revolt\EventLoop\Suspension;
final class TracingDriver implements Driver
{
private readonly Driver $driver;
/** @var array<string, true> */
private array $enabledCallbacks = [];
/** @var array<string, true> */
private array $unreferencedCallbacks = [];
/** @var array<string, string> */
private array $creationTraces = [];
/** @var array<string, string> */
private array $cancelTraces = [];
public function __construct(Driver $driver)
{
$this->driver = $driver;
}
public function run(): void
{
$this->driver->run();
}
public function stop(): void
{
$this->driver->stop();
}
public function getSuspension(): Suspension
{
return $this->driver->getSuspension();
}
public function isRunning(): bool
{
return $this->driver->isRunning();
}
public function defer(\Closure $closure): string
{
$id = $this->driver->defer(function (...$args) use ($closure) {
$this->cancel($args[0]);
return $closure(...$args);
});
$this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS));
$this->enabledCallbacks[$id] = true;
return $id;
}
public function delay(float $delay, \Closure $closure): string
{
$id = $this->driver->delay($delay, function (...$args) use ($closure) {
$this->cancel($args[0]);
return $closure(...$args);
});
$this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS));
$this->enabledCallbacks[$id] = true;
return $id;
}
public function repeat(float $interval, \Closure $closure): string
{
$id = $this->driver->repeat($interval, $closure);
$this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS));
$this->enabledCallbacks[$id] = true;
return $id;
}
public function onReadable(mixed $stream, \Closure $closure): string
{
$id = $this->driver->onReadable($stream, $closure);
$this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS));
$this->enabledCallbacks[$id] = true;
return $id;
}
public function onWritable(mixed $stream, \Closure $closure): string
{
$id = $this->driver->onWritable($stream, $closure);
$this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS));
$this->enabledCallbacks[$id] = true;
return $id;
}
public function onSignal(int $signal, \Closure $closure): string
{
$id = $this->driver->onSignal($signal, $closure);
$this->creationTraces[$id] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS));
$this->enabledCallbacks[$id] = true;
return $id;
}
public function enable(string $callbackId): string
{
try {
$this->driver->enable($callbackId);
$this->enabledCallbacks[$callbackId] = true;
} catch (InvalidCallbackError $e) {
$e->addInfo("Creation trace", $this->getCreationTrace($callbackId));
$e->addInfo("Cancellation trace", $this->getCancelTrace($callbackId));
throw $e;
}
return $callbackId;
}
public function cancel(string $callbackId): void
{
$this->driver->cancel($callbackId);
if (!isset($this->cancelTraces[$callbackId])) {
$this->cancelTraces[$callbackId] = $this->formatStacktrace(\debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS));
}
unset($this->enabledCallbacks[$callbackId], $this->unreferencedCallbacks[$callbackId]);
}
public function disable(string $callbackId): string
{
$this->driver->disable($callbackId);
unset($this->enabledCallbacks[$callbackId]);
return $callbackId;
}
public function reference(string $callbackId): string
{
try {
$this->driver->reference($callbackId);
unset($this->unreferencedCallbacks[$callbackId]);
} catch (InvalidCallbackError $e) {
$e->addInfo("Creation trace", $this->getCreationTrace($callbackId));
$e->addInfo("Cancellation trace", $this->getCancelTrace($callbackId));
throw $e;
}
return $callbackId;
}
public function unreference(string $callbackId): string
{
$this->driver->unreference($callbackId);
$this->unreferencedCallbacks[$callbackId] = true;
return $callbackId;
}
public function setErrorHandler(?\Closure $errorHandler): void
{
$this->driver->setErrorHandler($errorHandler);
}
public function getErrorHandler(): ?\Closure
{
return $this->driver->getErrorHandler();
}
/** @inheritdoc */
public function getHandle(): mixed
{
return $this->driver->getHandle();
}
public function dump(): string
{
$dump = "Enabled, referenced callbacks keeping the loop running: ";
foreach ($this->enabledCallbacks as $callbackId => $_) {
if (isset($this->unreferencedCallbacks[$callbackId])) {
continue;
}
$dump .= "Callback identifier: " . $callbackId . "\r\n";
$dump .= $this->getCreationTrace($callbackId);
$dump .= "\r\n\r\n";
}
return \rtrim($dump);
}
public function getIdentifiers(): array
{
return $this->driver->getIdentifiers();
}
public function getType(string $callbackId): CallbackType
{
return $this->driver->getType($callbackId);
}
public function isEnabled(string $callbackId): bool
{
return $this->driver->isEnabled($callbackId);
}
public function isReferenced(string $callbackId): bool
{
return $this->driver->isReferenced($callbackId);
}
public function __debugInfo(): array
{
return $this->driver->__debugInfo();
}
public function queue(\Closure $closure, mixed ...$args): void
{
$this->driver->queue($closure, ...$args);
}
private function getCreationTrace(string $callbackId): string
{
return $this->creationTraces[$callbackId] ?? 'No creation trace, yet.';
}
private function getCancelTrace(string $callbackId): string
{
return $this->cancelTraces[$callbackId] ?? 'No cancellation trace, yet.';
}
/**
* Formats a stacktrace obtained via `debug_backtrace()`.
*
* @param list<array{
* args?: list<mixed>,
* class?: class-string,
* file?: string,
* function: string,
* line?: int,
* object?: object,
* type?: string
* }> $trace Output of `debug_backtrace()`.
*
* @return string Formatted stacktrace.
*/
private function formatStacktrace(array $trace): string
{
return \implode("\n", \array_map(static function ($e, $i) {
$line = "#{$i} ";
if (isset($e["file"], $e['line'])) {
$line .= "{$e['file']}:{$e['line']} ";
}
if (isset($e["class"], $e["type"])) {
$line .= $e["class"] . $e["type"];
}
return $line . $e["function"] . "()";
}, $trace, \array_keys($trace)));
}
}

View File

@ -0,0 +1,261 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Driver;
use Revolt\EventLoop\Internal\AbstractDriver;
use Revolt\EventLoop\Internal\DriverCallback;
use Revolt\EventLoop\Internal\SignalCallback;
use Revolt\EventLoop\Internal\StreamCallback;
use Revolt\EventLoop\Internal\StreamReadableCallback;
use Revolt\EventLoop\Internal\StreamWritableCallback;
use Revolt\EventLoop\Internal\TimerCallback;
final class UvDriver extends AbstractDriver
{
public static function isSupported(): bool
{
return \extension_loaded("uv");
}
/** @var resource|\UVLoop A uv_loop resource created with uv_loop_new() */
private $handle;
/** @var array<string, resource> */
private array $events = [];
/** @var array<int, array<array-key, DriverCallback>> */
private array $uvCallbacks = [];
/** @var array<int, resource> */
private array $streams = [];
private readonly \Closure $ioCallback;
private readonly \Closure $timerCallback;
private readonly \Closure $signalCallback;
public function __construct()
{
parent::__construct();
$this->handle = \uv_loop_new();
$this->ioCallback = function ($event, $status, $events, $resource): void {
$callbacks = $this->uvCallbacks[(int) $event];
// Invoke the callback on errors, as this matches behavior with other loop back-ends.
// Re-enable callback as libuv disables the callback on non-zero status.
if ($status !== 0) {
$flags = 0;
foreach ($callbacks as $callback) {
\assert($callback instanceof StreamCallback);
$flags |= $callback->invokable ? $this->getStreamCallbackFlags($callback) : 0;
}
\uv_poll_start($event, $flags, $this->ioCallback);
}
foreach ($callbacks as $callback) {
\assert($callback instanceof StreamCallback);
// $events is ORed with 4 to trigger callback if no events are indicated (0) or on UV_DISCONNECT (4).
// http://docs.libuv.org/en/v1.x/poll.html
if (!($this->getStreamCallbackFlags($callback) & $events || ($events | 4) === 4)) {
continue;
}
$this->enqueueCallback($callback);
}
};
$this->timerCallback = function ($event): void {
$callback = $this->uvCallbacks[(int) $event][0];
\assert($callback instanceof TimerCallback);
$this->enqueueCallback($callback);
};
$this->signalCallback = function ($event): void {
$callback = $this->uvCallbacks[(int) $event][0];
$this->enqueueCallback($callback);
};
}
/**
* {@inheritdoc}
*/
public function cancel(string $callbackId): void
{
parent::cancel($callbackId);
if (!isset($this->events[$callbackId])) {
return;
}
$event = $this->events[$callbackId];
$eventId = (int) $event;
if (isset($this->uvCallbacks[$eventId][0])) { // All except IO callbacks.
unset($this->uvCallbacks[$eventId]);
} elseif (isset($this->uvCallbacks[$eventId][$callbackId])) {
$callback = $this->uvCallbacks[$eventId][$callbackId];
unset($this->uvCallbacks[$eventId][$callbackId]);
\assert($callback instanceof StreamCallback);
if (empty($this->uvCallbacks[$eventId])) {
unset($this->uvCallbacks[$eventId], $this->streams[(int) $callback->stream]);
}
}
unset($this->events[$callbackId]);
}
/**
* @return \UVLoop|resource
*/
public function getHandle(): mixed
{
return $this->handle;
}
protected function now(): float
{
\uv_update_time($this->handle);
/** @psalm-suppress TooManyArguments */
return \uv_now($this->handle) / 1000;
}
/**
* {@inheritdoc}
*/
protected function dispatch(bool $blocking): void
{
/** @psalm-suppress TooManyArguments */
\uv_run($this->handle, $blocking ? \UV::RUN_ONCE : \UV::RUN_NOWAIT);
}
/**
* {@inheritdoc}
*/
protected function activate(array $callbacks): void
{
$now = $this->now();
foreach ($callbacks as $callback) {
$id = $callback->id;
if ($callback instanceof StreamCallback) {
\assert(\is_resource($callback->stream));
$streamId = (int) $callback->stream;
if (isset($this->streams[$streamId])) {
$event = $this->streams[$streamId];
} elseif (isset($this->events[$id])) {
$event = $this->streams[$streamId] = $this->events[$id];
} else {
/** @psalm-suppress TooManyArguments */
$event = $this->streams[$streamId] = \uv_poll_init_socket($this->handle, $callback->stream);
}
$eventId = (int) $event;
$this->events[$id] = $event;
$this->uvCallbacks[$eventId][$id] = $callback;
$flags = 0;
foreach ($this->uvCallbacks[$eventId] as $w) {
\assert($w instanceof StreamCallback);
$flags |= $w->enabled ? ($this->getStreamCallbackFlags($w)) : 0;
}
\uv_poll_start($event, $flags, $this->ioCallback);
} elseif ($callback instanceof TimerCallback) {
if (isset($this->events[$id])) {
$event = $this->events[$id];
} else {
$event = $this->events[$id] = \uv_timer_init($this->handle);
}
$this->uvCallbacks[(int) $event] = [$callback];
\uv_timer_start(
$event,
(int) \min(\max(0, \ceil(($callback->expiration - $now) * 1000)), \PHP_INT_MAX),
$callback->repeat ? (int) \min(\max(0, \ceil($callback->interval * 1000)), \PHP_INT_MAX) : 0,
$this->timerCallback
);
} elseif ($callback instanceof SignalCallback) {
if (isset($this->events[$id])) {
$event = $this->events[$id];
} else {
/** @psalm-suppress TooManyArguments */
$event = $this->events[$id] = \uv_signal_init($this->handle);
}
$this->uvCallbacks[(int) $event] = [$callback];
/** @psalm-suppress TooManyArguments */
\uv_signal_start($event, $this->signalCallback, $callback->signal);
} else {
// @codeCoverageIgnoreStart
throw new \Error("Unknown callback type");
// @codeCoverageIgnoreEnd
}
}
}
/**
* {@inheritdoc}
*/
protected function deactivate(DriverCallback $callback): void
{
$id = $callback->id;
if (!isset($this->events[$id])) {
return;
}
$event = $this->events[$id];
if (!\uv_is_active($event)) {
return;
}
if ($callback instanceof StreamCallback) {
$flags = 0;
foreach ($this->uvCallbacks[(int) $event] as $w) {
\assert($w instanceof StreamCallback);
$flags |= $w->invokable ? ($this->getStreamCallbackFlags($w)) : 0;
}
if ($flags) {
\uv_poll_start($event, $flags, $this->ioCallback);
} else {
\uv_poll_stop($event);
}
} elseif ($callback instanceof TimerCallback) {
\uv_timer_stop($event);
} elseif ($callback instanceof SignalCallback) {
\uv_signal_stop($event);
} else {
// @codeCoverageIgnoreStart
throw new \Error("Unknown callback type");
// @codeCoverageIgnoreEnd
}
}
private function getStreamCallbackFlags(StreamCallback $callback): int
{
if ($callback instanceof StreamWritableCallback) {
return \UV::WRITABLE;
}
if ($callback instanceof StreamReadableCallback) {
return \UV::READABLE;
}
throw new \Error('Invalid callback type');
}
}

View File

@ -0,0 +1,83 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop;
// @codeCoverageIgnoreStart
use Revolt\EventLoop\Driver\EvDriver;
use Revolt\EventLoop\Driver\EventDriver;
use Revolt\EventLoop\Driver\StreamSelectDriver;
use Revolt\EventLoop\Driver\TracingDriver;
use Revolt\EventLoop\Driver\UvDriver;
final class DriverFactory
{
/**
* Creates a new loop instance and chooses the best available driver.
*
* @return Driver
*
* @throws \Error If an invalid class has been specified via REVOLT_LOOP_DRIVER
*/
public function create(): Driver
{
$driver = (function () {
if ($driver = $this->createDriverFromEnv()) {
return $driver;
}
if (UvDriver::isSupported()) {
return new UvDriver();
}
if (EvDriver::isSupported()) {
return new EvDriver();
}
if (EventDriver::isSupported()) {
return new EventDriver();
}
return new StreamSelectDriver();
})();
/** @psalm-suppress RiskyTruthyFalsyComparison */
if (\getenv("REVOLT_DRIVER_DEBUG_TRACE")) {
return new TracingDriver($driver);
}
return $driver;
}
/**
* @return Driver|null
*/
private function createDriverFromEnv(): ?Driver
{
$driver = \getenv("REVOLT_DRIVER");
/** @psalm-suppress RiskyTruthyFalsyComparison */
if (!$driver) {
return null;
}
if (!\class_exists($driver)) {
throw new \Error(\sprintf(
"Driver '%s' does not exist.",
$driver
));
}
if (!\is_subclass_of($driver, Driver::class)) {
throw new \Error(\sprintf(
"Driver '%s' is not a subclass of '%s'.",
$driver,
Driver::class
));
}
return new $driver();
}
}
// @codeCoverageIgnoreEnd

View File

@ -0,0 +1,82 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop;
/**
* Fiber local storage.
*
* Each instance stores data separately for each fiber. Usage examples include contextual logging data.
*
* @template T
*/
final class FiberLocal
{
/** @var \Fiber|null Dummy fiber for {main} */
private static ?\Fiber $mainFiber = null;
private static ?\WeakMap $localStorage = null;
public static function clear(): void
{
if (self::$localStorage === null) {
return;
}
$fiber = \Fiber::getCurrent() ?? self::$mainFiber;
if ($fiber === null) {
return;
}
unset(self::$localStorage[$fiber]);
}
private static function getFiberStorage(): \WeakMap
{
$fiber = \Fiber::getCurrent();
if ($fiber === null) {
$fiber = self::$mainFiber ??= new \Fiber(static function (): void {
// dummy fiber for main, as we need some object for the WeakMap
});
}
$localStorage = self::$localStorage ??= new \WeakMap();
return $localStorage[$fiber] ??= new \WeakMap();
}
/**
* @param \Closure():T $initializer
*/
public function __construct(private readonly \Closure $initializer)
{
}
/**
* @param T $value
*/
public function set(mixed $value): void
{
self::getFiberStorage()[$this] = [$value];
}
public function unset(): void
{
unset(self::getFiberStorage()[$this]);
}
/**
* @return T
*/
public function get(): mixed
{
$fiberStorage = self::getFiberStorage();
if (!isset($fiberStorage[$this])) {
$fiberStorage[$this] = [($this->initializer)()];
}
return $fiberStorage[$this][0];
}
}

View File

@ -0,0 +1,652 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
use Revolt\EventLoop\CallbackType;
use Revolt\EventLoop\Driver;
use Revolt\EventLoop\FiberLocal;
use Revolt\EventLoop\InvalidCallbackError;
use Revolt\EventLoop\Suspension;
use Revolt\EventLoop\UncaughtThrowable;
/**
* Event loop driver which implements all basic operations to allow interoperability.
*
* Callbacks (enabled or new callbacks) MUST immediately be marked as enabled, but only be activated (i.e. callbacks can
* be called) right before the next tick. Callbacks MUST NOT be called in the tick they were enabled.
*
* @internal
*/
abstract class AbstractDriver implements Driver
{
/** @var string Next callback identifier. */
private string $nextId = "a";
private \Fiber $fiber;
private \Fiber $callbackFiber;
private \Closure $errorCallback;
/** @var array<string, DriverCallback> */
private array $callbacks = [];
/** @var array<string, DriverCallback> */
private array $enableQueue = [];
/** @var array<string, DriverCallback> */
private array $enableDeferQueue = [];
/** @var null|\Closure(\Throwable):void */
private ?\Closure $errorHandler = null;
/** @var null|\Closure():mixed */
private ?\Closure $interrupt = null;
private readonly \Closure $interruptCallback;
private readonly \Closure $queueCallback;
/** @var \Closure():(null|\Closure(): mixed) */
private readonly \Closure $runCallback;
private readonly \stdClass $internalSuspensionMarker;
/** @var \SplQueue<array{\Closure, array}> */
private readonly \SplQueue $microtaskQueue;
/** @var \SplQueue<DriverCallback> */
private readonly \SplQueue $callbackQueue;
private bool $idle = false;
private bool $stopped = false;
/** @var \WeakMap<object, \WeakReference<DriverSuspension>> */
private \WeakMap $suspensions;
public function __construct()
{
if (\PHP_VERSION_ID < 80117 || \PHP_VERSION_ID >= 80200 && \PHP_VERSION_ID < 80204) {
// PHP GC is broken on early 8.1 and 8.2 versions, see https://github.com/php/php-src/issues/10496
/** @psalm-suppress RiskyTruthyFalsyComparison */
if (!\getenv('REVOLT_DRIVER_SUPPRESS_ISSUE_10496')) {
throw new \Error('Your version of PHP is affected by serious garbage collector bugs related to fibers. Please upgrade to a newer version of PHP, i.e. >= 8.1.17 or => 8.2.4');
}
}
$this->suspensions = new \WeakMap();
$this->internalSuspensionMarker = new \stdClass();
$this->microtaskQueue = new \SplQueue();
$this->callbackQueue = new \SplQueue();
$this->createLoopFiber();
$this->createCallbackFiber();
$this->createErrorCallback();
/** @psalm-suppress InvalidArgument */
$this->interruptCallback = $this->setInterrupt(...);
$this->queueCallback = $this->queue(...);
$this->runCallback = function (): ?\Closure {
do {
if ($this->fiber->isTerminated()) {
$this->createLoopFiber();
}
$result = $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start();
if ($result) { // Null indicates the loop fiber terminated without suspending.
return $result;
}
} while (\gc_collect_cycles() && !$this->stopped);
return null;
};
}
public function run(): void
{
if ($this->fiber->isRunning()) {
throw new \Error("The event loop is already running");
}
if (\Fiber::getCurrent()) {
throw new \Error(\sprintf("Can't call %s() within a fiber (i.e., outside of {main})", __METHOD__));
}
$lambda = ($this->runCallback)();
if ($lambda) {
$lambda();
throw new \Error(
'Interrupt from event loop must throw an exception: ' . ClosureHelper::getDescription($lambda)
);
}
}
public function stop(): void
{
$this->stopped = true;
}
public function isRunning(): bool
{
return $this->fiber->isRunning() || $this->fiber->isSuspended();
}
public function queue(\Closure $closure, mixed ...$args): void
{
$this->microtaskQueue->enqueue([$closure, $args]);
}
public function defer(\Closure $closure): string
{
$deferCallback = new DeferCallback($this->nextId++, $closure);
$this->callbacks[$deferCallback->id] = $deferCallback;
$this->enableDeferQueue[$deferCallback->id] = $deferCallback;
return $deferCallback->id;
}
public function delay(float $delay, \Closure $closure): string
{
if ($delay < 0) {
throw new \Error("Delay must be greater than or equal to zero");
}
$timerCallback = new TimerCallback($this->nextId++, $delay, $closure, $this->now() + $delay);
$this->callbacks[$timerCallback->id] = $timerCallback;
$this->enableQueue[$timerCallback->id] = $timerCallback;
return $timerCallback->id;
}
public function repeat(float $interval, \Closure $closure): string
{
if ($interval < 0) {
throw new \Error("Interval must be greater than or equal to zero");
}
$timerCallback = new TimerCallback($this->nextId++, $interval, $closure, $this->now() + $interval, true);
$this->callbacks[$timerCallback->id] = $timerCallback;
$this->enableQueue[$timerCallback->id] = $timerCallback;
return $timerCallback->id;
}
public function onReadable(mixed $stream, \Closure $closure): string
{
$streamCallback = new StreamReadableCallback($this->nextId++, $closure, $stream);
$this->callbacks[$streamCallback->id] = $streamCallback;
$this->enableQueue[$streamCallback->id] = $streamCallback;
return $streamCallback->id;
}
public function onWritable($stream, \Closure $closure): string
{
$streamCallback = new StreamWritableCallback($this->nextId++, $closure, $stream);
$this->callbacks[$streamCallback->id] = $streamCallback;
$this->enableQueue[$streamCallback->id] = $streamCallback;
return $streamCallback->id;
}
public function onSignal(int $signal, \Closure $closure): string
{
$signalCallback = new SignalCallback($this->nextId++, $closure, $signal);
$this->callbacks[$signalCallback->id] = $signalCallback;
$this->enableQueue[$signalCallback->id] = $signalCallback;
return $signalCallback->id;
}
public function enable(string $callbackId): string
{
if (!isset($this->callbacks[$callbackId])) {
throw InvalidCallbackError::invalidIdentifier($callbackId);
}
$callback = $this->callbacks[$callbackId];
if ($callback->enabled) {
return $callbackId; // Callback already enabled.
}
$callback->enabled = true;
if ($callback instanceof DeferCallback) {
$this->enableDeferQueue[$callback->id] = $callback;
} elseif ($callback instanceof TimerCallback) {
$callback->expiration = $this->now() + $callback->interval;
$this->enableQueue[$callback->id] = $callback;
} else {
$this->enableQueue[$callback->id] = $callback;
}
return $callbackId;
}
public function cancel(string $callbackId): void
{
$this->disable($callbackId);
unset($this->callbacks[$callbackId]);
}
public function disable(string $callbackId): string
{
if (!isset($this->callbacks[$callbackId])) {
return $callbackId;
}
$callback = $this->callbacks[$callbackId];
if (!$callback->enabled) {
return $callbackId; // Callback already disabled.
}
$callback->enabled = false;
$callback->invokable = false;
$id = $callback->id;
if ($callback instanceof DeferCallback) {
// Callback was only queued to be enabled.
unset($this->enableDeferQueue[$id]);
} elseif (isset($this->enableQueue[$id])) {
// Callback was only queued to be enabled.
unset($this->enableQueue[$id]);
} else {
$this->deactivate($callback);
}
return $callbackId;
}
public function reference(string $callbackId): string
{
if (!isset($this->callbacks[$callbackId])) {
throw InvalidCallbackError::invalidIdentifier($callbackId);
}
$this->callbacks[$callbackId]->referenced = true;
return $callbackId;
}
public function unreference(string $callbackId): string
{
if (!isset($this->callbacks[$callbackId])) {
return $callbackId;
}
$this->callbacks[$callbackId]->referenced = false;
return $callbackId;
}
public function getSuspension(): Suspension
{
$fiber = \Fiber::getCurrent();
// User callbacks are always executed outside the event loop fiber, so this should always be false.
\assert($fiber !== $this->fiber);
// Use queue closure in case of {main}, which can be unset by DriverSuspension after an uncaught exception.
$key = $fiber ?? $this->queueCallback;
$suspension = ($this->suspensions[$key] ?? null)?->get();
if ($suspension) {
return $suspension;
}
$suspension = new DriverSuspension(
$this->runCallback,
$this->queueCallback,
$this->interruptCallback,
$this->suspensions,
);
$this->suspensions[$key] = \WeakReference::create($suspension);
return $suspension;
}
public function setErrorHandler(?\Closure $errorHandler): void
{
$this->errorHandler = $errorHandler;
}
public function getErrorHandler(): ?\Closure
{
return $this->errorHandler;
}
public function __debugInfo(): array
{
// @codeCoverageIgnoreStart
return \array_map(fn (DriverCallback $callback) => [
'type' => $this->getType($callback->id),
'enabled' => $callback->enabled,
'referenced' => $callback->referenced,
], $this->callbacks);
// @codeCoverageIgnoreEnd
}
public function getIdentifiers(): array
{
return \array_keys($this->callbacks);
}
public function getType(string $callbackId): CallbackType
{
$callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
return match ($callback::class) {
DeferCallback::class => CallbackType::Defer,
TimerCallback::class => $callback->repeat ? CallbackType::Repeat : CallbackType::Delay,
StreamReadableCallback::class => CallbackType::Readable,
StreamWritableCallback::class => CallbackType::Writable,
SignalCallback::class => CallbackType::Signal,
};
}
public function isEnabled(string $callbackId): bool
{
$callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
return $callback->enabled;
}
public function isReferenced(string $callbackId): bool
{
$callback = $this->callbacks[$callbackId] ?? throw InvalidCallbackError::invalidIdentifier($callbackId);
return $callback->referenced;
}
/**
* Activates (enables) all the given callbacks.
*/
abstract protected function activate(array $callbacks): void;
/**
* Dispatches any pending read/write, timer, and signal events.
*/
abstract protected function dispatch(bool $blocking): void;
/**
* Deactivates (disables) the given callback.
*/
abstract protected function deactivate(DriverCallback $callback): void;
final protected function enqueueCallback(DriverCallback $callback): void
{
$this->callbackQueue->enqueue($callback);
$this->idle = false;
}
/**
* Invokes the error handler with the given exception.
*
* @param \Throwable $exception The exception thrown from an event callback.
*/
final protected function error(\Closure $closure, \Throwable $exception): void
{
if ($this->errorHandler === null) {
// Explicitly override the previous interrupt if it exists in this case, hiding the exception is worse
$this->interrupt = static fn () => $exception instanceof UncaughtThrowable
? throw $exception
: throw UncaughtThrowable::throwingCallback($closure, $exception);
return;
}
$fiber = new \Fiber($this->errorCallback);
/** @noinspection PhpUnhandledExceptionInspection */
$fiber->start($this->errorHandler, $exception);
}
/**
* Returns the current event loop time in second increments.
*
* Note this value does not necessarily correlate to wall-clock time, rather the value returned is meant to be used
* in relative comparisons to prior values returned by this method (intervals, expiration calculations, etc.).
*/
abstract protected function now(): float;
private function invokeMicrotasks(): void
{
while (!$this->microtaskQueue->isEmpty()) {
[$callback, $args] = $this->microtaskQueue->dequeue();
try {
// Clear $args to allow garbage collection
$callback(...$args, ...($args = []));
} catch (\Throwable $exception) {
$this->error($callback, $exception);
} finally {
FiberLocal::clear();
}
unset($callback, $args);
if ($this->interrupt) {
/** @noinspection PhpUnhandledExceptionInspection */
\Fiber::suspend($this->internalSuspensionMarker);
}
}
}
/**
* @return bool True if no enabled and referenced callbacks remain in the loop.
*/
private function isEmpty(): bool
{
foreach ($this->callbacks as $callback) {
if ($callback->enabled && $callback->referenced) {
return false;
}
}
return true;
}
/**
* Executes a single tick of the event loop.
*/
private function tick(bool $previousIdle): void
{
$this->activate($this->enableQueue);
foreach ($this->enableQueue as $callback) {
$callback->invokable = true;
}
$this->enableQueue = [];
foreach ($this->enableDeferQueue as $callback) {
$callback->invokable = true;
$this->enqueueCallback($callback);
}
$this->enableDeferQueue = [];
$blocking = $previousIdle
&& !$this->stopped
&& !$this->isEmpty();
if ($blocking) {
$this->invokeCallbacks();
/** @psalm-suppress TypeDoesNotContainType */
if (!empty($this->enableDeferQueue) || !empty($this->enableQueue)) {
$blocking = false;
}
}
/** @psalm-suppress RedundantCondition */
$this->dispatch($blocking);
}
private function invokeCallbacks(): void
{
while (!$this->microtaskQueue->isEmpty() || !$this->callbackQueue->isEmpty()) {
/** @noinspection PhpUnhandledExceptionInspection */
$yielded = $this->callbackFiber->isStarted()
? $this->callbackFiber->resume()
: $this->callbackFiber->start();
if ($yielded !== $this->internalSuspensionMarker) {
$this->createCallbackFiber();
}
if ($this->interrupt) {
$this->invokeInterrupt();
}
}
}
/**
* @param \Closure():mixed $interrupt
*/
private function setInterrupt(\Closure $interrupt): void
{
\assert($this->interrupt === null);
$this->interrupt = $interrupt;
}
private function invokeInterrupt(): void
{
\assert($this->interrupt !== null);
$interrupt = $this->interrupt;
$this->interrupt = null;
/** @noinspection PhpUnhandledExceptionInspection */
\Fiber::suspend($interrupt);
}
private function createLoopFiber(): void
{
$this->fiber = new \Fiber(function (): void {
$this->stopped = false;
// Invoke microtasks if we have some
$this->invokeCallbacks();
/** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
while (!$this->stopped) {
if ($this->interrupt) {
$this->invokeInterrupt();
}
if ($this->isEmpty()) {
return;
}
$previousIdle = $this->idle;
$this->idle = true;
$this->tick($previousIdle);
$this->invokeCallbacks();
}
});
}
private function createCallbackFiber(): void
{
$this->callbackFiber = new \Fiber(function (): void {
do {
$this->invokeMicrotasks();
while (!$this->callbackQueue->isEmpty()) {
/** @var DriverCallback $callback */
$callback = $this->callbackQueue->dequeue();
if (!isset($this->callbacks[$callback->id]) || !$callback->invokable) {
unset($callback);
continue;
}
if ($callback instanceof DeferCallback) {
$this->cancel($callback->id);
} elseif ($callback instanceof TimerCallback) {
if (!$callback->repeat) {
$this->cancel($callback->id);
} else {
// Disable and re-enable, so it's not executed repeatedly in the same tick
// See https://github.com/amphp/amp/issues/131
$this->disable($callback->id);
$this->enable($callback->id);
}
}
try {
$result = match (true) {
$callback instanceof StreamCallback => ($callback->closure)(
$callback->id,
$callback->stream
),
$callback instanceof SignalCallback => ($callback->closure)(
$callback->id,
$callback->signal
),
default => ($callback->closure)($callback->id),
};
if ($result !== null) {
throw InvalidCallbackError::nonNullReturn($callback->id, $callback->closure);
}
} catch (\Throwable $exception) {
$this->error($callback->closure, $exception);
} finally {
FiberLocal::clear();
}
unset($callback);
if ($this->interrupt) {
/** @noinspection PhpUnhandledExceptionInspection */
\Fiber::suspend($this->internalSuspensionMarker);
}
$this->invokeMicrotasks();
}
/** @noinspection PhpUnhandledExceptionInspection */
\Fiber::suspend($this->internalSuspensionMarker);
} while (true);
});
}
private function createErrorCallback(): void
{
$this->errorCallback = function (\Closure $errorHandler, \Throwable $exception): void {
try {
$errorHandler($exception);
} catch (\Throwable $exception) {
$this->interrupt = static fn () => $exception instanceof UncaughtThrowable
? throw $exception
: throw UncaughtThrowable::throwingErrorHandler($errorHandler, $exception);
}
};
}
final public function __serialize(): never
{
throw new \Error(__CLASS__ . ' does not support serialization');
}
final public function __unserialize(array $data): never
{
throw new \Error(__CLASS__ . ' does not support deserialization');
}
}

View File

@ -0,0 +1,30 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/** @internal */
final class ClosureHelper
{
public static function getDescription(\Closure $closure): string
{
try {
$reflection = new \ReflectionFunction($closure);
$description = $reflection->name;
if ($scopeClass = $reflection->getClosureScopeClass()) {
$description = $scopeClass->name . '::' . $description;
}
if ($reflection->getFileName() !== false && $reflection->getStartLine()) {
$description .= " defined in " . $reflection->getFileName() . ':' . $reflection->getStartLine();
}
return $description;
} catch (\ReflectionException) {
return '???';
}
}
}

View File

@ -0,0 +1,10 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/** @internal */
final class DeferCallback extends DriverCallback
{
}

View File

@ -0,0 +1,40 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/**
* @internal
*/
abstract class DriverCallback
{
public bool $invokable = false;
public bool $enabled = true;
public bool $referenced = true;
public function __construct(
public readonly string $id,
public readonly \Closure $closure
) {
}
/**
* @param string $property
*/
public function __get(string $property): never
{
throw new \Error("Unknown property '{$property}'");
}
/**
* @param string $property
* @param mixed $value
*/
public function __set(string $property, mixed $value): never
{
throw new \Error("Unknown property '{$property}'");
}
}

View File

@ -0,0 +1,196 @@
<?php
/** @noinspection PhpPropertyOnlyWrittenInspection */
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
use Revolt\EventLoop\Suspension;
/**
* @internal
*
* @template T
* @implements Suspension<T>
*/
final class DriverSuspension implements Suspension
{
private ?\Fiber $suspendedFiber = null;
/** @var \WeakReference<\Fiber>|null */
private readonly ?\WeakReference $fiberRef;
private ?\Error $error = null;
private bool $pending = false;
private bool $deadMain = false;
/**
* @param \WeakMap<object, \WeakReference<DriverSuspension>> $suspensions
*/
public function __construct(
private readonly \Closure $run,
private readonly \Closure $queue,
private readonly \Closure $interrupt,
private readonly \WeakMap $suspensions,
) {
$fiber = \Fiber::getCurrent();
$this->fiberRef = $fiber ? \WeakReference::create($fiber) : null;
}
public function resume(mixed $value = null): void
{
// Ignore spurious resumes to old dead {main} suspension
if ($this->deadMain) {
return;
}
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling resume()');
}
$this->pending = false;
/** @var \Fiber|null $fiber */
$fiber = $this->fiberRef?->get();
if ($fiber) {
($this->queue)(static function () use ($fiber, $value): void {
// The fiber may be destroyed with suspension as part of the GC cycle collector.
if (!$fiber->isTerminated()) {
$fiber->resume($value);
}
});
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => $value);
}
}
public function suspend(): mixed
{
// Throw exception when trying to use old dead {main} suspension
if ($this->deadMain) {
throw new \Error(
'Suspension cannot be suspended after an uncaught exception is thrown from the event loop',
);
}
if ($this->pending) {
throw new \Error('Must call resume() or throw() before calling suspend() again');
}
$fiber = $this->fiberRef?->get();
if ($fiber !== \Fiber::getCurrent()) {
throw new \Error('Must not call suspend() from another fiber');
}
$this->pending = true;
$this->error = null;
// Awaiting from within a fiber.
if ($fiber) {
$this->suspendedFiber = $fiber;
try {
$value = \Fiber::suspend();
$this->suspendedFiber = null;
} catch (\FiberError $error) {
$this->pending = false;
$this->suspendedFiber = null;
$this->error = $error;
throw $error;
}
// Setting $this->suspendedFiber = null in finally will set the fiber to null if a fiber is destroyed
// as part of a cycle collection, causing an error if the suspension is subsequently resumed.
return $value;
}
// Awaiting from {main}.
$result = ($this->run)();
/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
if ($this->pending) {
// This is now a dead {main} suspension.
$this->deadMain = true;
// Unset suspension for {main} using queue closure.
unset($this->suspensions[$this->queue]);
$result && $result(); // Unwrap any uncaught exceptions from the event loop
\gc_collect_cycles(); // Collect any circular references before dumping pending suspensions.
$info = '';
foreach ($this->suspensions as $suspensionRef) {
if ($suspension = $suspensionRef->get()) {
\assert($suspension instanceof self);
$fiber = $suspension->fiberRef?->get();
if ($fiber === null) {
continue;
}
$reflectionFiber = new \ReflectionFiber($fiber);
$info .= "\n\n" . $this->formatStacktrace($reflectionFiber->getTrace(\DEBUG_BACKTRACE_IGNORE_ARGS));
}
}
throw new \Error('Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher):' . $info);
}
return $result();
}
public function throw(\Throwable $throwable): void
{
// Ignore spurious resumes to old dead {main} suspension
if ($this->deadMain) {
return;
}
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling throw()');
}
$this->pending = false;
/** @var \Fiber|null $fiber */
$fiber = $this->fiberRef?->get();
if ($fiber) {
($this->queue)(static function () use ($fiber, $throwable): void {
// The fiber may be destroyed with suspension as part of the GC cycle collector.
if (!$fiber->isTerminated()) {
$fiber->throw($throwable);
}
});
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => throw $throwable);
}
}
private function formatStacktrace(array $trace): string
{
return \implode("\n", \array_map(static function ($e, $i) {
$line = "#{$i} ";
if (isset($e["file"])) {
$line .= "{$e['file']}:{$e['line']} ";
}
if (isset($e["class"], $e["type"])) {
$line .= $e["class"] . $e["type"];
}
return $line . $e["function"] . "()";
}, $trace, \array_keys($trace)));
}
}

View File

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/** @internal */
final class SignalCallback extends DriverCallback
{
public function __construct(
string $id,
\Closure $closure,
public readonly int $signal
) {
parent::__construct($id, $closure);
}
}

View File

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/** @internal */
abstract class StreamCallback extends DriverCallback
{
/**
* @param resource $stream
*/
public function __construct(
string $id,
\Closure $closure,
public readonly mixed $stream
) {
parent::__construct($id, $closure);
}
}

View File

@ -0,0 +1,10 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/** @internal */
final class StreamReadableCallback extends StreamCallback
{
}

View File

@ -0,0 +1,10 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/** @internal */
final class StreamWritableCallback extends StreamCallback
{
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/** @internal */
final class TimerCallback extends DriverCallback
{
public function __construct(
string $id,
public readonly float $interval,
\Closure $callback,
public float $expiration,
public readonly bool $repeat = false
) {
parent::__construct($id, $callback);
}
}

View File

@ -0,0 +1,156 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop\Internal;
/**
* Uses a binary tree stored in an array to implement a heap.
*
* @internal
*/
final class TimerQueue
{
/** @var array<int, TimerCallback> */
private array $callbacks = [];
/** @var array<string, int> */
private array $pointers = [];
/**
* Inserts the callback into the queue.
*
* Time complexity: O(log(n)).
*/
public function insert(TimerCallback $callback): void
{
\assert(!isset($this->pointers[$callback->id]));
$node = \count($this->callbacks);
$this->callbacks[$node] = $callback;
$this->pointers[$callback->id] = $node;
$this->heapifyUp($node);
}
/**
* Removes the given callback from the queue.
*
* Time complexity: O(log(n)).
*/
public function remove(TimerCallback $callback): void
{
$id = $callback->id;
if (!isset($this->pointers[$id])) {
return;
}
$this->removeAndRebuild($this->pointers[$id]);
}
/**
* Deletes and returns the callback on top of the heap if it has expired, otherwise null is returned.
*
* Time complexity: O(log(n)).
*
* @param float $now Current event loop time.
*
* @return TimerCallback|null Expired callback at the top of the heap or null if the callback has not expired.
*/
public function extract(float $now): ?TimerCallback
{
if (!$this->callbacks) {
return null;
}
$callback = $this->callbacks[0];
if ($callback->expiration > $now) {
return null;
}
$this->removeAndRebuild(0);
return $callback;
}
/**
* Returns the expiration time value at the top of the heap.
*
* Time complexity: O(1).
*
* @return float|null Expiration time of the callback at the top of the heap or null if the heap is empty.
*/
public function peek(): ?float
{
return isset($this->callbacks[0]) ? $this->callbacks[0]->expiration : null;
}
/**
* @param int $node Rebuild the data array from the given node upward.
*/
private function heapifyUp(int $node): void
{
$entry = $this->callbacks[$node];
while ($node !== 0 && $entry->expiration < $this->callbacks[$parent = ($node - 1) >> 1]->expiration) {
$this->swap($node, $parent);
$node = $parent;
}
}
/**
* @param int $node Rebuild the data array from the given node downward.
*/
private function heapifyDown(int $node): void
{
$length = \count($this->callbacks);
while (($child = ($node << 1) + 1) < $length) {
if ($this->callbacks[$child]->expiration < $this->callbacks[$node]->expiration
&& ($child + 1 >= $length || $this->callbacks[$child]->expiration < $this->callbacks[$child + 1]->expiration)
) {
// Left child is less than parent and right child.
$swap = $child;
} elseif ($child + 1 < $length && $this->callbacks[$child + 1]->expiration < $this->callbacks[$node]->expiration) {
// Right child is less than parent and left child.
$swap = $child + 1;
} else { // Left and right child are greater than parent.
break;
}
$this->swap($node, $swap);
$node = $swap;
}
}
private function swap(int $left, int $right): void
{
$temp = $this->callbacks[$left];
$this->callbacks[$left] = $this->callbacks[$right];
$this->pointers[$this->callbacks[$right]->id] = $left;
$this->callbacks[$right] = $temp;
$this->pointers[$temp->id] = $right;
}
/**
* @param int $node Remove the given node and then rebuild the data array.
*/
private function removeAndRebuild(int $node): void
{
$length = \count($this->callbacks) - 1;
$id = $this->callbacks[$node]->id;
$left = $this->callbacks[$node] = $this->callbacks[$length];
$this->pointers[$left->id] = $node;
unset($this->callbacks[$length], $this->pointers[$id]);
if ($node < $length) { // don't need to do anything if we removed the last element
$parent = ($node - 1) >> 1;
if ($parent >= 0 && $this->callbacks[$node]->expiration < $this->callbacks[$parent]->expiration) {
$this->heapifyUp($node);
} else {
$this->heapifyDown($node);
}
}
}
}

View File

@ -0,0 +1,77 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop;
use Revolt\EventLoop\Internal\ClosureHelper;
final class InvalidCallbackError extends \Error
{
public const E_NONNULL_RETURN = 1;
public const E_INVALID_IDENTIFIER = 2;
/**
* MUST be thrown if any callback returns a non-null value.
*/
public static function nonNullReturn(string $callbackId, \Closure $closure): self
{
return new self(
$callbackId,
self::E_NONNULL_RETURN,
'Non-null return value received from callback ' . ClosureHelper::getDescription($closure)
);
}
/**
* MUST be thrown if any operation (except disable() and cancel()) is attempted with an invalid callback identifier.
*
* An invalid callback identifier is any identifier that is not yet emitted by the driver or cancelled by the user.
*/
public static function invalidIdentifier(string $callbackId): self
{
return new self($callbackId, self::E_INVALID_IDENTIFIER, 'Invalid callback identifier ' . $callbackId);
}
/** @var string */
private readonly string $rawMessage;
/** @var string */
private readonly string $callbackId;
/** @var array<string, string> */
private array $info = [];
/**
* @param string $callbackId The callback identifier.
* @param string $message The exception message.
*/
private function __construct(string $callbackId, int $code, string $message)
{
parent::__construct($message, $code);
$this->callbackId = $callbackId;
$this->rawMessage = $message;
}
/**
* @return string The callback identifier.
*/
public function getCallbackId(): string
{
return $this->callbackId;
}
public function addInfo(string $key, string $message): void
{
$this->info[$key] = $message;
$info = '';
foreach ($this->info as $infoKey => $infoMessage) {
$info .= "\r\n\r\n" . $infoKey . ': ' . $infoMessage;
}
$this->message = $this->rawMessage . $info;
}
}

View File

@ -0,0 +1,43 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop;
/**
* Should be used to run and suspend the event loop instead of directly interacting with fibers.
*
* **Example**
*
* ```php
* $suspension = EventLoop::getSuspension();
*
* $promise->then(
* fn (mixed $value) => $suspension->resume($value),
* fn (Throwable $error) => $suspension->throw($error)
* );
*
* $suspension->suspend();
* ```
*
* @template T
*/
interface Suspension
{
/**
* @param T $value The value to return from the call to {@see suspend()}.
*/
public function resume(mixed $value = null): void;
/**
* Returns the value provided to {@see resume()} or throws the exception provided to {@see throw()}.
*
* @return T
*/
public function suspend(): mixed;
/**
* Throws the given exception from the call to {@see suspend()}.
*/
public function throw(\Throwable $throwable): void;
}

View File

@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop;
use Revolt\EventLoop\Internal\ClosureHelper;
final class UncaughtThrowable extends \Error
{
public static function throwingCallback(\Closure $closure, \Throwable $previous): self
{
return new self(
"Uncaught %s thrown in event loop callback %s; use Revolt\EventLoop::setErrorHandler() to gracefully handle such exceptions%s",
$closure,
$previous
);
}
public static function throwingErrorHandler(\Closure $closure, \Throwable $previous): self
{
return new self("Uncaught %s thrown in event loop error handler %s%s", $closure, $previous);
}
private function __construct(string $message, \Closure $closure, \Throwable $previous)
{
parent::__construct(\sprintf(
$message,
\str_replace("\0", '@', \get_class($previous)), // replace NUL-byte in anonymous class name
ClosureHelper::getDescription($closure),
$previous->getMessage() !== '' ? ': ' . $previous->getMessage() : ''
), 0, $previous);
}
}

View File

@ -0,0 +1,14 @@
<?php
declare(strict_types=1);
namespace Revolt\EventLoop;
/**
* MUST be thrown if a feature is not supported by the system.
*
* This might happen if ext-pcntl is missing and the loop driver doesn't support another way to dispatch signals.
*/
final class UnsupportedFeatureException extends \Exception
{
}