Verzeichnisstruktur phpBB-3.3.15


Veröffentlicht
28.08.2024

So funktioniert es


Auf das letzte Element klicken. Dies geht jeweils ein Schritt zurück

Auf das Icon klicken, dies öffnet das Verzeichnis. Nochmal klicken schließt das Verzeichnis.
Auf den Verzeichnisnamen klicken, dies zeigt nur das Verzeichnis mit Inhalt an

(Beispiel Datei-Icons)

Auf das Icon klicken um den Quellcode anzuzeigen

EachPromise.php

Zuletzt modifiziert: 02.04.2025, 15:03 - Dateigröße: 7.60 KiB


001  <?php
002   
003  namespace GuzzleHttp\Promise;
004   
005  /**
006   * Represents a promise that iterates over many promises and invokes
007   * side-effect functions in the process.
008   */
009  class EachPromise implements PromisorInterface
010  {
011      private $pending = [];
012   
013      private $nextPendingIndex = 0;
014   
015      /** @var \Iterator|null */
016      private $iterable;
017   
018      /** @var callable|int|null */
019      private $concurrency;
020   
021      /** @var callable|null */
022      private $onFulfilled;
023   
024      /** @var callable|null */
025      private $onRejected;
026   
027      /** @var Promise|null */
028      private $aggregate;
029   
030      /** @var bool|null */
031      private $mutex;
032   
033      /**
034       * Configuration hash can include the following key value pairs:
035       *
036       * - fulfilled: (callable) Invoked when a promise fulfills. The function
037       *   is invoked with three arguments: the fulfillment value, the index
038       *   position from the iterable list of the promise, and the aggregate
039       *   promise that manages all of the promises. The aggregate promise may
040       *   be resolved from within the callback to short-circuit the promise.
041       * - rejected: (callable) Invoked when a promise is rejected. The
042       *   function is invoked with three arguments: the rejection reason, the
043       *   index position from the iterable list of the promise, and the
044       *   aggregate promise that manages all of the promises. The aggregate
045       *   promise may be resolved from within the callback to short-circuit
046       *   the promise.
047       * - concurrency: (integer) Pass this configuration option to limit the
048       *   allowed number of outstanding concurrently executing promises,
049       *   creating a capped pool of promises. There is no limit by default.
050       *
051       * @param mixed $iterable Promises or values to iterate.
052       * @param array $config   Configuration options
053       */
054      public function __construct($iterable, array $config = [])
055      {
056          $this->iterable = Create::iterFor($iterable);
057   
058          if (isset($config['concurrency'])) {
059              $this->concurrency = $config['concurrency'];
060          }
061   
062          if (isset($config['fulfilled'])) {
063              $this->onFulfilled = $config['fulfilled'];
064          }
065   
066          if (isset($config['rejected'])) {
067              $this->onRejected = $config['rejected'];
068          }
069      }
070   
071      /** @psalm-suppress InvalidNullableReturnType */
072      public function promise()
073      {
074          if ($this->aggregate) {
075              return $this->aggregate;
076          }
077   
078          try {
079              $this->createPromise();
080              /** @psalm-assert Promise $this->aggregate */
081              $this->iterable->rewind();
082              $this->refillPending();
083          } catch (\Throwable $e) {
084              $this->aggregate->reject($e);
085          } catch (\Exception $e) {
086              $this->aggregate->reject($e);
087          }
088   
089          /**
090           * @psalm-suppress NullableReturnStatement
091           * @phpstan-ignore-next-line
092           */
093          return $this->aggregate;
094      }
095   
096      private function createPromise()
097      {
098          $this->mutex = false;
099          $this->aggregate = new Promise(function () {
100              if ($this->checkIfFinished()) {
101                  return;
102              }
103              reset($this->pending);
104              // Consume a potentially fluctuating list of promises while
105              // ensuring that indexes are maintained (precluding array_shift).
106              while ($promise = current($this->pending)) {
107                  next($this->pending);
108                  $promise->wait();
109                  if (Is::settled($this->aggregate)) {
110                      return;
111                  }
112              }
113          });
114   
115          // Clear the references when the promise is resolved.
116          $clearFn = function () {
117              $this->iterable = $this->concurrency = $this->pending = null;
118              $this->onFulfilled = $this->onRejected = null;
119              $this->nextPendingIndex = 0;
120          };
121   
122          $this->aggregate->then($clearFn, $clearFn);
123      }
124   
125      private function refillPending()
126      {
127          if (!$this->concurrency) {
128              // Add all pending promises.
129              while ($this->addPending() && $this->advanceIterator());
130              return;
131          }
132   
133          // Add only up to N pending promises.
134          $concurrency = is_callable($this->concurrency)
135              ? call_user_func($this->concurrency, count($this->pending))
136              : $this->concurrency;
137          $concurrency = max($concurrency - count($this->pending), 0);
138          // Concurrency may be set to 0 to disallow new promises.
139          if (!$concurrency) {
140              return;
141          }
142          // Add the first pending promise.
143          $this->addPending();
144          // Note this is special handling for concurrency=1 so that we do
145          // not advance the iterator after adding the first promise. This
146          // helps work around issues with generators that might not have the
147          // next value to yield until promise callbacks are called.
148          while (--$concurrency
149              && $this->advanceIterator()
150              && $this->addPending());
151      }
152   
153      private function addPending()
154      {
155          if (!$this->iterable || !$this->iterable->valid()) {
156              return false;
157          }
158   
159          $promise = Create::promiseFor($this->iterable->current());
160          $key = $this->iterable->key();
161   
162          // Iterable keys may not be unique, so we use a counter to
163          // guarantee uniqueness
164          $idx = $this->nextPendingIndex++;
165   
166          $this->pending[$idx] = $promise->then(
167              function ($value) use ($idx, $key) {
168                  if ($this->onFulfilled) {
169                      call_user_func(
170                          $this->onFulfilled,
171                          $value,
172                          $key,
173                          $this->aggregate
174                      );
175                  }
176                  $this->step($idx);
177              },
178              function ($reason) use ($idx, $key) {
179                  if ($this->onRejected) {
180                      call_user_func(
181                          $this->onRejected,
182                          $reason,
183                          $key,
184                          $this->aggregate
185                      );
186                  }
187                  $this->step($idx);
188              }
189          );
190   
191          return true;
192      }
193   
194      private function advanceIterator()
195      {
196          // Place a lock on the iterator so that we ensure to not recurse,
197          // preventing fatal generator errors.
198          if ($this->mutex) {
199              return false;
200          }
201   
202          $this->mutex = true;
203   
204          try {
205              $this->iterable->next();
206              $this->mutex = false;
207              return true;
208          } catch (\Throwable $e) {
209              $this->aggregate->reject($e);
210              $this->mutex = false;
211              return false;
212          } catch (\Exception $e) {
213              $this->aggregate->reject($e);
214              $this->mutex = false;
215              return false;
216          }
217      }
218   
219      private function step($idx)
220      {
221          // If the promise was already resolved, then ignore this step.
222          if (Is::settled($this->aggregate)) {
223              return;
224          }
225   
226          unset($this->pending[$idx]);
227   
228          // Only refill pending promises if we are not locked, preventing the
229          // EachPromise to recursively invoke the provided iterator, which
230          // cause a fatal error: "Cannot resume an already running generator"
231          if ($this->advanceIterator() && !$this->checkIfFinished()) {
232              // Add more pending promises if possible.
233              $this->refillPending();
234          }
235      }
236   
237      private function checkIfFinished()
238      {
239          if (!$this->pending && !$this->iterable->valid()) {
240              // Resolve the promise if there's nothing left to do.
241              $this->aggregate->resolve(null);
242              return true;
243          }
244   
245          return false;
246      }
247  }
248