make(Dispatcher::class)); } public function __construct(Repository $config, QueueManager $queue) { $this->config = $config; $this->queue = $queue; $this->setUpPayloadGenerator(); } protected static function setUpJobListener(Dispatcher $dispatcher): void { $previousTenant = null; $dispatcher->listen(JobProcessing::class, function ($event) use (&$previousTenant) { $previousTenant = tenant(); static::initializeTenancyForQueue($event->job->payload()['tenant_id'] ?? null); }); $dispatcher->listen(JobRetryRequested::class, function ($event) use (&$previousTenant) { $previousTenant = tenant(); static::initializeTenancyForQueue($event->payload()['tenant_id'] ?? null); }); // If we're running tests, we make sure to clean up after any artisan('queue:work') calls $revertToPreviousState = function ($event) use (&$previousTenant) { static::revertToPreviousState($event, $previousTenant); }; $dispatcher->listen(JobProcessed::class, $revertToPreviousState); // artisan('queue:work') which succeeds $dispatcher->listen(JobFailed::class, $revertToPreviousState); // artisan('queue:work') which fails } protected static function initializeTenancyForQueue(string|int|null $tenantId): void { if ($tenantId === null) { // The job is not tenant-aware if (tenancy()->initialized) { // Tenancy was initialized, so we revert back to the central context tenancy()->end(); } return; } if (static::$forceRefresh) { // Re-initialize tenancy between all jobs if (tenancy()->initialized) { tenancy()->end(); } /** @var Tenant $tenant */ $tenant = tenancy()->find($tenantId); tenancy()->initialize($tenant); return; } if (tenancy()->initialized) { // Tenancy is already initialized if (tenant()->getTenantKey() === $tenantId) { // It's initialized for the same tenant (e.g. dispatchSync was used, or the previous job also ran for this tenant) return; } } // Tenancy was either not initialized, or initialized for a different tenant. // Therefore, we initialize it for the correct tenant. /** @var Tenant $tenant */ $tenant = tenancy()->find($tenantId); tenancy()->initialize($tenant); } protected static function revertToPreviousState(JobProcessed|JobFailed $event, ?Tenant &$previousTenant): void { $tenantId = $event->job->payload()['tenant_id'] ?? null; // The job was not tenant-aware if (! $tenantId) { return; } // Revert back to the previous tenant if (tenant() && $previousTenant && $previousTenant->isNot(tenant())) { tenancy()->initialize($previousTenant); } // End tenancy if (tenant() && (! $previousTenant)) { tenancy()->end(); } } protected function setUpPayloadGenerator(): void { $bootstrapper = &$this; if (! $this->queue instanceof QueueFake) { $this->queue->createPayloadUsing(function ($connection) use (&$bootstrapper) { return $bootstrapper->getPayload($connection); }); } } public function bootstrap(Tenant $tenant): void { // } public function revert(): void { // } public function getPayload(string $connection): array { if (! tenancy()->initialized) { return []; } if ($this->config["queue.connections.$connection.central"]) { return []; } $id = tenant()->getTenantKey(); return [ 'tenant_id' => $id, ]; } }