Verzeichnisstruktur phpBB-3.3.15
- Veröffentlicht
- 28.08.2024
So funktioniert es
|
Auf das letzte Element klicken. Dies geht jeweils ein Schritt zurück |
Auf das Icon klicken, dies öffnet das Verzeichnis. Nochmal klicken schließt das Verzeichnis. |
|
(Beispiel Datei-Icons)
|
Auf das Icon klicken um den Quellcode anzuzeigen |
EachPromise.php
001 <?php
002
003 namespace GuzzleHttp\Promise;
004
005 /**
006 * Represents a promise that iterates over many promises and invokes
007 * side-effect functions in the process.
008 */
009 class EachPromise implements PromisorInterface
010 {
011 private $pending = [];
012
013 private $nextPendingIndex = 0;
014
015 /** @var \Iterator|null */
016 private $iterable;
017
018 /** @var callable|int|null */
019 private $concurrency;
020
021 /** @var callable|null */
022 private $onFulfilled;
023
024 /** @var callable|null */
025 private $onRejected;
026
027 /** @var Promise|null */
028 private $aggregate;
029
030 /** @var bool|null */
031 private $mutex;
032
033 /**
034 * Configuration hash can include the following key value pairs:
035 *
036 * - fulfilled: (callable) Invoked when a promise fulfills. The function
037 * is invoked with three arguments: the fulfillment value, the index
038 * position from the iterable list of the promise, and the aggregate
039 * promise that manages all of the promises. The aggregate promise may
040 * be resolved from within the callback to short-circuit the promise.
041 * - rejected: (callable) Invoked when a promise is rejected. The
042 * function is invoked with three arguments: the rejection reason, the
043 * index position from the iterable list of the promise, and the
044 * aggregate promise that manages all of the promises. The aggregate
045 * promise may be resolved from within the callback to short-circuit
046 * the promise.
047 * - concurrency: (integer) Pass this configuration option to limit the
048 * allowed number of outstanding concurrently executing promises,
049 * creating a capped pool of promises. There is no limit by default.
050 *
051 * @param mixed $iterable Promises or values to iterate.
052 * @param array $config Configuration options
053 */
054 public function __construct($iterable, array $config = [])
055 {
056 $this->iterable = Create::iterFor($iterable);
057
058 if (isset($config['concurrency'])) {
059 $this->concurrency = $config['concurrency'];
060 }
061
062 if (isset($config['fulfilled'])) {
063 $this->onFulfilled = $config['fulfilled'];
064 }
065
066 if (isset($config['rejected'])) {
067 $this->onRejected = $config['rejected'];
068 }
069 }
070
071 /** @psalm-suppress InvalidNullableReturnType */
072 public function promise()
073 {
074 if ($this->aggregate) {
075 return $this->aggregate;
076 }
077
078 try {
079 $this->createPromise();
080 /** @psalm-assert Promise $this->aggregate */
081 $this->iterable->rewind();
082 $this->refillPending();
083 } catch (\Throwable $e) {
084 $this->aggregate->reject($e);
085 } catch (\Exception $e) {
086 $this->aggregate->reject($e);
087 }
088
089 /**
090 * @psalm-suppress NullableReturnStatement
091 * @phpstan-ignore-next-line
092 */
093 return $this->aggregate;
094 }
095
096 private function createPromise()
097 {
098 $this->mutex = false;
099 $this->aggregate = new Promise(function () {
100 if ($this->checkIfFinished()) {
101 return;
102 }
103 reset($this->pending);
104 // Consume a potentially fluctuating list of promises while
105 // ensuring that indexes are maintained (precluding array_shift).
106 while ($promise = current($this->pending)) {
107 next($this->pending);
108 $promise->wait();
109 if (Is::settled($this->aggregate)) {
110 return;
111 }
112 }
113 });
114
115 // Clear the references when the promise is resolved.
116 $clearFn = function () {
117 $this->iterable = $this->concurrency = $this->pending = null;
118 $this->onFulfilled = $this->onRejected = null;
119 $this->nextPendingIndex = 0;
120 };
121
122 $this->aggregate->then($clearFn, $clearFn);
123 }
124
125 private function refillPending()
126 {
127 if (!$this->concurrency) {
128 // Add all pending promises.
129 while ($this->addPending() && $this->advanceIterator());
130 return;
131 }
132
133 // Add only up to N pending promises.
134 $concurrency = is_callable($this->concurrency)
135 ? call_user_func($this->concurrency, count($this->pending))
136 : $this->concurrency;
137 $concurrency = max($concurrency - count($this->pending), 0);
138 // Concurrency may be set to 0 to disallow new promises.
139 if (!$concurrency) {
140 return;
141 }
142 // Add the first pending promise.
143 $this->addPending();
144 // Note this is special handling for concurrency=1 so that we do
145 // not advance the iterator after adding the first promise. This
146 // helps work around issues with generators that might not have the
147 // next value to yield until promise callbacks are called.
148 while (--$concurrency
149 && $this->advanceIterator()
150 && $this->addPending());
151 }
152
153 private function addPending()
154 {
155 if (!$this->iterable || !$this->iterable->valid()) {
156 return false;
157 }
158
159 $promise = Create::promiseFor($this->iterable->current());
160 $key = $this->iterable->key();
161
162 // Iterable keys may not be unique, so we use a counter to
163 // guarantee uniqueness
164 $idx = $this->nextPendingIndex++;
165
166 $this->pending[$idx] = $promise->then(
167 function ($value) use ($idx, $key) {
168 if ($this->onFulfilled) {
169 call_user_func(
170 $this->onFulfilled,
171 $value,
172 $key,
173 $this->aggregate
174 );
175 }
176 $this->step($idx);
177 },
178 function ($reason) use ($idx, $key) {
179 if ($this->onRejected) {
180 call_user_func(
181 $this->onRejected,
182 $reason,
183 $key,
184 $this->aggregate
185 );
186 }
187 $this->step($idx);
188 }
189 );
190
191 return true;
192 }
193
194 private function advanceIterator()
195 {
196 // Place a lock on the iterator so that we ensure to not recurse,
197 // preventing fatal generator errors.
198 if ($this->mutex) {
199 return false;
200 }
201
202 $this->mutex = true;
203
204 try {
205 $this->iterable->next();
206 $this->mutex = false;
207 return true;
208 } catch (\Throwable $e) {
209 $this->aggregate->reject($e);
210 $this->mutex = false;
211 return false;
212 } catch (\Exception $e) {
213 $this->aggregate->reject($e);
214 $this->mutex = false;
215 return false;
216 }
217 }
218
219 private function step($idx)
220 {
221 // If the promise was already resolved, then ignore this step.
222 if (Is::settled($this->aggregate)) {
223 return;
224 }
225
226 unset($this->pending[$idx]);
227
228 // Only refill pending promises if we are not locked, preventing the
229 // EachPromise to recursively invoke the provided iterator, which
230 // cause a fatal error: "Cannot resume an already running generator"
231 if ($this->advanceIterator() && !$this->checkIfFinished()) {
232 // Add more pending promises if possible.
233 $this->refillPending();
234 }
235 }
236
237 private function checkIfFinished()
238 {
239 if (!$this->pending && !$this->iterable->valid()) {
240 // Resolve the promise if there's nothing left to do.
241 $this->aggregate->resolve(null);
242 return true;
243 }
244
245 return false;
246 }
247 }
248