mirror of
https://github.com/archtechx/tenancy.git
synced 2025-12-12 13:34:03 +00:00
parallel commands: core # autodetect, bugfixes, improved output
This commit is contained in:
parent
b4a055315b
commit
39bcbda5d0
6 changed files with 138 additions and 19 deletions
1
.gitattributes
vendored
1
.gitattributes
vendored
|
|
@ -22,3 +22,4 @@
|
||||||
/t export-ignore
|
/t export-ignore
|
||||||
/test export-ignore
|
/test export-ignore
|
||||||
/tests export-ignore
|
/tests export-ignore
|
||||||
|
/typedefs export-ignore
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ use Stancl\Tenancy\Concerns\ParallelCommand;
|
||||||
use Stancl\Tenancy\Database\Exceptions\TenantDatabaseDoesNotExistException;
|
use Stancl\Tenancy\Database\Exceptions\TenantDatabaseDoesNotExistException;
|
||||||
use Stancl\Tenancy\Events\DatabaseMigrated;
|
use Stancl\Tenancy\Events\DatabaseMigrated;
|
||||||
use Stancl\Tenancy\Events\MigratingDatabase;
|
use Stancl\Tenancy\Events\MigratingDatabase;
|
||||||
|
use Symfony\Component\Console\Output\OutputInterface as OI;
|
||||||
|
|
||||||
class Migrate extends MigrateCommand
|
class Migrate extends MigrateCommand
|
||||||
{
|
{
|
||||||
|
|
@ -52,7 +53,7 @@ class Migrate extends MigrateCommand
|
||||||
|
|
||||||
if ($this->getProcesses() > 1) {
|
if ($this->getProcesses() > 1) {
|
||||||
return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) {
|
return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) {
|
||||||
return $this->getTenants($chunk->all());
|
return $this->getTenants($chunk);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -80,9 +81,25 @@ class Migrate extends MigrateCommand
|
||||||
$tenant->run(function ($tenant) use (&$success) {
|
$tenant->run(function ($tenant) use (&$success) {
|
||||||
event(new MigratingDatabase($tenant));
|
event(new MigratingDatabase($tenant));
|
||||||
|
|
||||||
// Migrate
|
$verbosity = (int) $this->output->getVerbosity();
|
||||||
if (parent::handle() !== 0) {
|
|
||||||
$success = false;
|
if ($this->runningConcurrently) {
|
||||||
|
// The output gets messy when multiple processes are writing to the same stdout
|
||||||
|
$this->output->setVerbosity(OI::VERBOSITY_QUIET);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Migrate
|
||||||
|
if (parent::handle() !== 0) {
|
||||||
|
$success = false;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
$this->output->setVerbosity($verbosity);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->runningConcurrently) {
|
||||||
|
// todo@cli the Migrating info above always has extra spaces, and the success below does WHEN there is work that got done by the block above. same in Rollback
|
||||||
|
$this->components->success("Migrated tenant {$tenant->getTenantKey()}");
|
||||||
}
|
}
|
||||||
|
|
||||||
event(new DatabaseMigrated($tenant));
|
event(new DatabaseMigrated($tenant));
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ class MigrateFresh extends BaseCommand
|
||||||
|
|
||||||
if ($this->getProcesses() > 1) {
|
if ($this->getProcesses() > 1) {
|
||||||
return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) {
|
return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) {
|
||||||
return $this->getTenants($chunk->all());
|
return $this->getTenants($chunk);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -81,6 +81,8 @@ class MigrateFresh extends BaseCommand
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Only used when running concurrently.
|
||||||
|
*
|
||||||
* @param LazyCollection<covariant int|string, \Stancl\Tenancy\Contracts\Tenant&\Illuminate\Database\Eloquent\Model> $tenants
|
* @param LazyCollection<covariant int|string, \Stancl\Tenancy\Contracts\Tenant&\Illuminate\Database\Eloquent\Model> $tenants
|
||||||
*/
|
*/
|
||||||
protected function migrateFreshTenants(LazyCollection $tenants): bool
|
protected function migrateFreshTenants(LazyCollection $tenants): bool
|
||||||
|
|
@ -89,6 +91,8 @@ class MigrateFresh extends BaseCommand
|
||||||
|
|
||||||
foreach ($tenants as $tenant) {
|
foreach ($tenants as $tenant) {
|
||||||
try {
|
try {
|
||||||
|
$this->components->info("Migrating (fresh) tenant {$tenant->getTenantKey()}");
|
||||||
|
|
||||||
$tenant->run(function ($tenant) use (&$success) {
|
$tenant->run(function ($tenant) use (&$success) {
|
||||||
$this->components->info("Wiping database of tenant {$tenant->getTenantKey()}", OI::VERBOSITY_VERY_VERBOSE);
|
$this->components->info("Wiping database of tenant {$tenant->getTenantKey()}", OI::VERBOSITY_VERY_VERBOSE);
|
||||||
if ($this->wipeDB()) {
|
if ($this->wipeDB()) {
|
||||||
|
|
@ -105,6 +109,8 @@ class MigrateFresh extends BaseCommand
|
||||||
$success = false;
|
$success = false;
|
||||||
$this->components->error("Migrating database of tenant {$tenant->getTenantKey()} failed!");
|
$this->components->error("Migrating database of tenant {$tenant->getTenantKey()} failed!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$this->components->success("Migrated (fresh) tenant {$tenant->getTenantKey()}");
|
||||||
});
|
});
|
||||||
} catch (TenantDatabaseDoesNotExistException|QueryException $e) {
|
} catch (TenantDatabaseDoesNotExistException|QueryException $e) {
|
||||||
$this->components->error("Migration failed for tenant {$tenant->getTenantKey()}: {$e->getMessage()}");
|
$this->components->error("Migration failed for tenant {$tenant->getTenantKey()}: {$e->getMessage()}");
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ use Stancl\Tenancy\Concerns\ParallelCommand;
|
||||||
use Stancl\Tenancy\Database\Exceptions\TenantDatabaseDoesNotExistException;
|
use Stancl\Tenancy\Database\Exceptions\TenantDatabaseDoesNotExistException;
|
||||||
use Stancl\Tenancy\Events\DatabaseRolledBack;
|
use Stancl\Tenancy\Events\DatabaseRolledBack;
|
||||||
use Stancl\Tenancy\Events\RollingBackDatabase;
|
use Stancl\Tenancy\Events\RollingBackDatabase;
|
||||||
|
use Symfony\Component\Console\Output\OutputInterface as OI;
|
||||||
|
|
||||||
class Rollback extends RollbackCommand
|
class Rollback extends RollbackCommand
|
||||||
{
|
{
|
||||||
|
|
@ -42,7 +43,7 @@ class Rollback extends RollbackCommand
|
||||||
|
|
||||||
if ($this->getProcesses() > 1) {
|
if ($this->getProcesses() > 1) {
|
||||||
return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) {
|
return $this->runConcurrently($this->getTenantChunks()->map(function ($chunk) {
|
||||||
return $this->getTenants($chunk->all());
|
return $this->getTenants($chunk);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -70,14 +71,29 @@ class Rollback extends RollbackCommand
|
||||||
|
|
||||||
foreach ($tenants as $tenant) {
|
foreach ($tenants as $tenant) {
|
||||||
try {
|
try {
|
||||||
$this->components->info("Tenant {$tenant->getTenantKey()}");
|
$this->components->info("Rolling back tenant {$tenant->getTenantKey()}");
|
||||||
|
|
||||||
$tenant->run(function ($tenant) use (&$success) {
|
$tenant->run(function ($tenant) use (&$success) {
|
||||||
event(new RollingBackDatabase($tenant));
|
event(new RollingBackDatabase($tenant));
|
||||||
|
|
||||||
// Rollback
|
$verbosity = (int) $this->output->getVerbosity();
|
||||||
if (parent::handle() !== 0) {
|
|
||||||
$success = false;
|
if ($this->runningConcurrently) {
|
||||||
|
// The output gets messy when multiple processes are writing to the same stdout
|
||||||
|
$this->output->setVerbosity(OI::VERBOSITY_QUIET);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Rollback
|
||||||
|
if (parent::handle() !== 0) {
|
||||||
|
$success = false;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
$this->output->setVerbosity($verbosity);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->runningConcurrently) {
|
||||||
|
$this->components->success("Rolled back tenant {$tenant->getTenantKey()}");
|
||||||
}
|
}
|
||||||
|
|
||||||
event(new DatabaseRolledBack($tenant));
|
event(new DatabaseRolledBack($tenant));
|
||||||
|
|
|
||||||
|
|
@ -5,23 +5,37 @@ declare(strict_types=1);
|
||||||
namespace Stancl\Tenancy\Concerns;
|
namespace Stancl\Tenancy\Concerns;
|
||||||
|
|
||||||
use ArrayAccess;
|
use ArrayAccess;
|
||||||
|
use Countable;
|
||||||
|
use Exception;
|
||||||
|
use FFI;
|
||||||
use Illuminate\Support\Collection;
|
use Illuminate\Support\Collection;
|
||||||
use Illuminate\Support\Facades\DB;
|
use Illuminate\Support\Facades\DB;
|
||||||
use Symfony\Component\Console\Input\InputOption;
|
use Symfony\Component\Console\Input\InputOption;
|
||||||
|
|
||||||
trait ParallelCommand
|
trait ParallelCommand
|
||||||
{
|
{
|
||||||
public const MAX_PROCESSES = 24;
|
public const MAX_PROCESSES = 32;
|
||||||
|
protected bool $runningConcurrently = false;
|
||||||
|
|
||||||
abstract protected function childHandle(mixed ...$args): bool;
|
abstract protected function childHandle(mixed ...$args): bool;
|
||||||
|
|
||||||
public function addProcessesOption(): void
|
public function addProcessesOption(): void
|
||||||
{
|
{
|
||||||
$this->addOption('processes', 'p', InputOption::VALUE_OPTIONAL, 'How many processes to spawn. Maximum value: ' . static::MAX_PROCESSES . ', recommended value: core count', 1);
|
$this->addOption(
|
||||||
|
'processes',
|
||||||
|
'p',
|
||||||
|
InputOption::VALUE_OPTIONAL,
|
||||||
|
'How many processes to spawn. Maximum value: ' . static::MAX_PROCESSES . ', recommended value: core count (use just -p)',
|
||||||
|
-1,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function forkProcess(mixed ...$args): int
|
protected function forkProcess(mixed ...$args): int
|
||||||
{
|
{
|
||||||
|
if (! app()->runningInConsole()) {
|
||||||
|
throw new Exception('Parallel commands are only available in CLI context.');
|
||||||
|
}
|
||||||
|
|
||||||
$pid = pcntl_fork();
|
$pid = pcntl_fork();
|
||||||
|
|
||||||
if ($pid === -1) {
|
if ($pid === -1) {
|
||||||
|
|
@ -37,16 +51,67 @@ trait ParallelCommand
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected function sysctlGetLogicalCoreCount(bool $darwin): int
|
||||||
|
{
|
||||||
|
$ffi = FFI::cdef('int sysctlbyname(const char *name, void *oldp, size_t *oldlenp, void *newp, size_t newlen);');
|
||||||
|
|
||||||
|
$cores = $ffi->new('int');
|
||||||
|
$size = $ffi->new('size_t');
|
||||||
|
$size->cdata = FFI::sizeof($cores);
|
||||||
|
|
||||||
|
// perflevel0 refers to P-cores on M-series, and the entire CPU on Intel Macs
|
||||||
|
if ($darwin && $ffi->sysctlbyname('hw.xperflevel0.logicalcpu', FFI::addr($cores), FFI::addr($size), null, 0) === 0) {
|
||||||
|
return $size->cdata;
|
||||||
|
} else if ($darwin) {
|
||||||
|
// Reset the size in case the pointer got written to (likely shouldn't happen)
|
||||||
|
$size->cdata = FFI::sizeof($cores);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should return the total number of logical cores on any BSD-based system
|
||||||
|
if ($ffi->sysctlbyname('hw.ncpu', FFI::addr($cores), FFI::addr($size), null, 0) == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $cores->cdata;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function getLogicalCoreCount(): int
|
||||||
|
{
|
||||||
|
// We use the logical core count as it should work best for I/O bound code
|
||||||
|
return match (PHP_OS_FAMILY) {
|
||||||
|
'Windows' => (int) getenv('NUMBER_OF_PROCESSORS'),
|
||||||
|
'Linux' => substr_count(file_get_contents('/proc/cpuinfo'), 'processor'),
|
||||||
|
'Darwin', 'BSD' => $this->sysctlGetLogicalCoreCount(PHP_OS_FAMILY === 'Darwin'),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
protected function getProcesses(): int
|
protected function getProcesses(): int
|
||||||
{
|
{
|
||||||
$processes = (int) $this->input->getOption('processes');
|
$processes = $this->input->getOption('processes');
|
||||||
|
|
||||||
if (($processes < 0) || ($processes > static::MAX_PROCESSES)) {
|
if ($processes === null) {
|
||||||
|
// This is used when the option is set but *without* a value (-p).
|
||||||
|
$processes = $this->getLogicalCoreCount();
|
||||||
|
} else if ((int) $processes === -1) {
|
||||||
|
// Default value we set for the option -- this is used when the option is *not set*.
|
||||||
|
$processes = 1;
|
||||||
|
} else {
|
||||||
|
// Option value set by the user.
|
||||||
|
$processes = (int) $processes;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($processes < 0) { // can come from sysctlGetLogicalCoreCount()
|
||||||
|
$this->components->error('Minimum value for processes is 1. Try specifying -p manually.');
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($processes > static::MAX_PROCESSES) {
|
||||||
$this->components->error('Maximum value for processes is ' . static::MAX_PROCESSES);
|
$this->components->error('Maximum value for processes is ' . static::MAX_PROCESSES);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($processes > 1 && ! function_exists('pcntl_fork')) {
|
if ($processes > 1 && ! function_exists('pcntl_fork')) {
|
||||||
|
exit(1);
|
||||||
$this->components->error('The pcntl extension is required for parallel migrations to work.');
|
$this->components->error('The pcntl extension is required for parallel migrations to work.');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -54,7 +119,7 @@ trait ParallelCommand
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Collection<int, Collection<int, \Stancl\Tenancy\Contracts\Tenant&\Illuminate\Database\Eloquent\Model>>>
|
* @return Collection<int, array<int, \Stancl\Tenancy\Contracts\Tenant&\Illuminate\Database\Eloquent\Model>>>
|
||||||
*/
|
*/
|
||||||
protected function getTenantChunks(): Collection
|
protected function getTenantChunks(): Collection
|
||||||
{
|
{
|
||||||
|
|
@ -64,20 +129,26 @@ trait ParallelCommand
|
||||||
return $tenants->chunk((int) ceil($tenants->count() / $this->getProcesses()))->map(function ($chunk) {
|
return $tenants->chunk((int) ceil($tenants->count() / $this->getProcesses()))->map(function ($chunk) {
|
||||||
$chunk = array_values($chunk->all());
|
$chunk = array_values($chunk->all());
|
||||||
|
|
||||||
/** @var Collection<int, \Stancl\Tenancy\Contracts\Tenant&\Illuminate\Database\Eloquent\Model> $chunk */
|
/** @var array<int, \Stancl\Tenancy\Contracts\Tenant&\Illuminate\Database\Eloquent\Model> $chunk */
|
||||||
return $chunk;
|
return $chunk;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param array|ArrayAccess<int, mixed>|null $args
|
* @param array|(ArrayAccess<int, mixed>&Countable)|null $args
|
||||||
*/
|
*/
|
||||||
protected function runConcurrently(array|ArrayAccess|null $args = null): int
|
protected function runConcurrently(array|(ArrayAccess&Countable)|null $args = null): int
|
||||||
{
|
{
|
||||||
$processes = $this->getProcesses();
|
$processes = $this->getProcesses();
|
||||||
$success = true;
|
$success = true;
|
||||||
$pids = [];
|
$pids = [];
|
||||||
|
|
||||||
|
if (count($args) < $processes) {
|
||||||
|
$processes = count($args);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->runningConcurrently = true;
|
||||||
|
|
||||||
for ($i = 0; $i < $processes; $i++) {
|
for ($i = 0; $i < $processes; $i++) {
|
||||||
$pid = $this->forkProcess($args !== null ? $args[$i] : null);
|
$pid = $this->forkProcess($args !== null ? $args[$i] : null);
|
||||||
|
|
||||||
|
|
@ -101,7 +172,7 @@ trait ParallelCommand
|
||||||
$exitCode = pcntl_wexitstatus($status);
|
$exitCode = pcntl_wexitstatus($status);
|
||||||
|
|
||||||
if ($exitCode === 0) {
|
if ($exitCode === 0) {
|
||||||
$this->components->info("Child process [$i] (PID $pid) finished successfully.");
|
$this->components->success("Child process [$i] (PID $pid) finished successfully.");
|
||||||
} else {
|
} else {
|
||||||
$success = false;
|
$success = false;
|
||||||
$this->components->error("Child process [$i] (PID $pid) completed with failures.");
|
$this->components->error("Child process [$i] (PID $pid) completed with failures.");
|
||||||
|
|
|
||||||
8
typedefs/FFI.php
Normal file
8
typedefs/FFI.php
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
// Stub for Intelephense support
|
||||||
|
class FFI
|
||||||
|
{
|
||||||
|
public function __call($name, $arguments) {}
|
||||||
|
public function __callStatic($name, $arguments) {}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue