From 39bcbda5d037f6bb7a0057e3790bf2952120f86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20=C5=A0tancl?= Date: Fri, 27 Sep 2024 23:02:03 +0200 Subject: [PATCH] parallel commands: core # autodetect, bugfixes, improved output --- .gitattributes | 1 + src/Commands/Migrate.php | 25 +++++++-- src/Commands/MigrateFresh.php | 8 ++- src/Commands/Rollback.php | 26 ++++++++-- src/Concerns/ParallelCommand.php | 89 ++++++++++++++++++++++++++++---- typedefs/FFI.php | 8 +++ 6 files changed, 138 insertions(+), 19 deletions(-) create mode 100644 typedefs/FFI.php diff --git a/.gitattributes b/.gitattributes index e0804500..3736c54d 100644 --- a/.gitattributes +++ b/.gitattributes @@ -22,3 +22,4 @@ /t export-ignore /test export-ignore /tests export-ignore +/typedefs export-ignore diff --git a/src/Commands/Migrate.php b/src/Commands/Migrate.php index c3ba37e4..ab428546 100644 --- a/src/Commands/Migrate.php +++ b/src/Commands/Migrate.php @@ -16,6 +16,7 @@ use Stancl\Tenancy\Concerns\ParallelCommand; use Stancl\Tenancy\Database\Exceptions\TenantDatabaseDoesNotExistException; use Stancl\Tenancy\Events\DatabaseMigrated; use Stancl\Tenancy\Events\MigratingDatabase; +use Symfony\Component\Console\Output\OutputInterface as OI; class Migrate extends MigrateCommand { @@ -52,7 +53,7 @@ class Migrate extends MigrateCommand if ($this->getProcesses() > 1) { return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) { - return $this->getTenants($chunk->all()); + return $this->getTenants($chunk); })); } @@ -80,9 +81,25 @@ class Migrate extends MigrateCommand $tenant->run(function ($tenant) use (&$success) { event(new MigratingDatabase($tenant)); - // Migrate - if (parent::handle() !== 0) { - $success = false; + $verbosity = (int) $this->output->getVerbosity(); + + if ($this->runningConcurrently) { + // The output gets messy when multiple processes are writing to the same stdout + $this->output->setVerbosity(OI::VERBOSITY_QUIET); + } + + try { + // Migrate + if (parent::handle() !== 0) { + $success = false; + } + } finally { + $this->output->setVerbosity($verbosity); + } + + if ($this->runningConcurrently) { + // todo@cli the Migrating info above always has extra spaces, and the success below does WHEN there is work that got done by the block above. same in Rollback + $this->components->success("Migrated tenant {$tenant->getTenantKey()}"); } event(new DatabaseMigrated($tenant)); diff --git a/src/Commands/MigrateFresh.php b/src/Commands/MigrateFresh.php index 02e4c189..4e89cefd 100644 --- a/src/Commands/MigrateFresh.php +++ b/src/Commands/MigrateFresh.php @@ -38,7 +38,7 @@ class MigrateFresh extends BaseCommand if ($this->getProcesses() > 1) { return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) { - return $this->getTenants($chunk->all()); + return $this->getTenants($chunk); })); } @@ -81,6 +81,8 @@ class MigrateFresh extends BaseCommand } /** + * Only used when running concurrently. + * * @param LazyCollection $tenants */ protected function migrateFreshTenants(LazyCollection $tenants): bool @@ -89,6 +91,8 @@ class MigrateFresh extends BaseCommand foreach ($tenants as $tenant) { try { + $this->components->info("Migrating (fresh) tenant {$tenant->getTenantKey()}"); + $tenant->run(function ($tenant) use (&$success) { $this->components->info("Wiping database of tenant {$tenant->getTenantKey()}", OI::VERBOSITY_VERY_VERBOSE); if ($this->wipeDB()) { @@ -105,6 +109,8 @@ class MigrateFresh extends BaseCommand $success = false; $this->components->error("Migrating database of tenant {$tenant->getTenantKey()} failed!"); } + + $this->components->success("Migrated (fresh) tenant {$tenant->getTenantKey()}"); }); } catch (TenantDatabaseDoesNotExistException|QueryException $e) { $this->components->error("Migration failed for tenant {$tenant->getTenantKey()}: {$e->getMessage()}"); diff --git a/src/Commands/Rollback.php b/src/Commands/Rollback.php index 7ea01f08..e7018a5a 100644 --- a/src/Commands/Rollback.php +++ b/src/Commands/Rollback.php @@ -15,6 +15,7 @@ use Stancl\Tenancy\Concerns\ParallelCommand; use Stancl\Tenancy\Database\Exceptions\TenantDatabaseDoesNotExistException; use Stancl\Tenancy\Events\DatabaseRolledBack; use Stancl\Tenancy\Events\RollingBackDatabase; +use Symfony\Component\Console\Output\OutputInterface as OI; class Rollback extends RollbackCommand { @@ -42,7 +43,7 @@ class Rollback extends RollbackCommand if ($this->getProcesses() > 1) { return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) { - return $this->getTenants($chunk->all()); + return $this->getTenants($chunk); })); } @@ -70,14 +71,29 @@ class Rollback extends RollbackCommand foreach ($tenants as $tenant) { try { - $this->components->info("Tenant {$tenant->getTenantKey()}"); + $this->components->info("Rolling back tenant {$tenant->getTenantKey()}"); $tenant->run(function ($tenant) use (&$success) { event(new RollingBackDatabase($tenant)); - // Rollback - if (parent::handle() !== 0) { - $success = false; + $verbosity = (int) $this->output->getVerbosity(); + + if ($this->runningConcurrently) { + // The output gets messy when multiple processes are writing to the same stdout + $this->output->setVerbosity(OI::VERBOSITY_QUIET); + } + + try { + // Rollback + if (parent::handle() !== 0) { + $success = false; + } + } finally { + $this->output->setVerbosity($verbosity); + } + + if ($this->runningConcurrently) { + $this->components->success("Rolled back tenant {$tenant->getTenantKey()}"); } event(new DatabaseRolledBack($tenant)); diff --git a/src/Concerns/ParallelCommand.php b/src/Concerns/ParallelCommand.php index 55383788..1da3e7d9 100644 --- a/src/Concerns/ParallelCommand.php +++ b/src/Concerns/ParallelCommand.php @@ -5,23 +5,37 @@ declare(strict_types=1); namespace Stancl\Tenancy\Concerns; use ArrayAccess; +use Countable; +use Exception; +use FFI; use Illuminate\Support\Collection; use Illuminate\Support\Facades\DB; use Symfony\Component\Console\Input\InputOption; trait ParallelCommand { - public const MAX_PROCESSES = 24; + public const MAX_PROCESSES = 32; + protected bool $runningConcurrently = false; abstract protected function childHandle(mixed ...$args): bool; public function addProcessesOption(): void { - $this->addOption('processes', 'p', InputOption::VALUE_OPTIONAL, 'How many processes to spawn. Maximum value: ' . static::MAX_PROCESSES . ', recommended value: core count', 1); + $this->addOption( + 'processes', + 'p', + InputOption::VALUE_OPTIONAL, + 'How many processes to spawn. Maximum value: ' . static::MAX_PROCESSES . ', recommended value: core count (use just -p)', + -1, + ); } protected function forkProcess(mixed ...$args): int { + if (! app()->runningInConsole()) { + throw new Exception('Parallel commands are only available in CLI context.'); + } + $pid = pcntl_fork(); if ($pid === -1) { @@ -37,16 +51,67 @@ trait ParallelCommand } } + protected function sysctlGetLogicalCoreCount(bool $darwin): int + { + $ffi = FFI::cdef('int sysctlbyname(const char *name, void *oldp, size_t *oldlenp, void *newp, size_t newlen);'); + + $cores = $ffi->new('int'); + $size = $ffi->new('size_t'); + $size->cdata = FFI::sizeof($cores); + + // perflevel0 refers to P-cores on M-series, and the entire CPU on Intel Macs + if ($darwin && $ffi->sysctlbyname('hw.xperflevel0.logicalcpu', FFI::addr($cores), FFI::addr($size), null, 0) === 0) { + return $size->cdata; + } else if ($darwin) { + // Reset the size in case the pointer got written to (likely shouldn't happen) + $size->cdata = FFI::sizeof($cores); + } + + // This should return the total number of logical cores on any BSD-based system + if ($ffi->sysctlbyname('hw.ncpu', FFI::addr($cores), FFI::addr($size), null, 0) == -1) { + return -1; + } + + return $cores->cdata; + } + + protected function getLogicalCoreCount(): int + { + // We use the logical core count as it should work best for I/O bound code + return match (PHP_OS_FAMILY) { + 'Windows' => (int) getenv('NUMBER_OF_PROCESSORS'), + 'Linux' => substr_count(file_get_contents('/proc/cpuinfo'), 'processor'), + 'Darwin', 'BSD' => $this->sysctlGetLogicalCoreCount(PHP_OS_FAMILY === 'Darwin'), + }; + } + protected function getProcesses(): int { - $processes = (int) $this->input->getOption('processes'); + $processes = $this->input->getOption('processes'); - if (($processes < 0) || ($processes > static::MAX_PROCESSES)) { + if ($processes === null) { + // This is used when the option is set but *without* a value (-p). + $processes = $this->getLogicalCoreCount(); + } else if ((int) $processes === -1) { + // Default value we set for the option -- this is used when the option is *not set*. + $processes = 1; + } else { + // Option value set by the user. + $processes = (int) $processes; + } + + if ($processes < 0) { // can come from sysctlGetLogicalCoreCount() + $this->components->error('Minimum value for processes is 1. Try specifying -p manually.'); + exit(1); + } + + if ($processes > static::MAX_PROCESSES) { $this->components->error('Maximum value for processes is ' . static::MAX_PROCESSES); exit(1); } if ($processes > 1 && ! function_exists('pcntl_fork')) { + exit(1); $this->components->error('The pcntl extension is required for parallel migrations to work.'); } @@ -54,7 +119,7 @@ trait ParallelCommand } /** - * @return Collection>> + * @return Collection>> */ protected function getTenantChunks(): Collection { @@ -64,20 +129,26 @@ trait ParallelCommand return $tenants->chunk((int) ceil($tenants->count() / $this->getProcesses()))->map(function ($chunk) { $chunk = array_values($chunk->all()); - /** @var Collection $chunk */ + /** @var array $chunk */ return $chunk; }); } /** - * @param array|ArrayAccess|null $args + * @param array|(ArrayAccess&Countable)|null $args */ - protected function runConcurrently(array|ArrayAccess|null $args = null): int + protected function runConcurrently(array|(ArrayAccess&Countable)|null $args = null): int { $processes = $this->getProcesses(); $success = true; $pids = []; + if (count($args) < $processes) { + $processes = count($args); + } + + $this->runningConcurrently = true; + for ($i = 0; $i < $processes; $i++) { $pid = $this->forkProcess($args !== null ? $args[$i] : null); @@ -101,7 +172,7 @@ trait ParallelCommand $exitCode = pcntl_wexitstatus($status); if ($exitCode === 0) { - $this->components->info("Child process [$i] (PID $pid) finished successfully."); + $this->components->success("Child process [$i] (PID $pid) finished successfully."); } else { $success = false; $this->components->error("Child process [$i] (PID $pid) completed with failures."); diff --git a/typedefs/FFI.php b/typedefs/FFI.php new file mode 100644 index 00000000..16123957 --- /dev/null +++ b/typedefs/FFI.php @@ -0,0 +1,8 @@ +