Verzeichnisstruktur phpBB-3.2.0
- Veröffentlicht
- 06.01.2017
So funktioniert es
|
Auf das letzte Element klicken. Dies geht jeweils ein Schritt zurück |
Auf das Icon klicken, dies öffnet das Verzeichnis. Nochmal klicken schließt das Verzeichnis. |
|
(Beispiel Datei-Icons)
|
Auf das Icon klicken um den Quellcode anzuzeigen |
Pool.php
001 <?php
002 namespace GuzzleHttp;
003
004 use GuzzleHttp\Event\BeforeEvent;
005 use GuzzleHttp\Event\RequestEvents;
006 use GuzzleHttp\Message\RequestInterface;
007 use GuzzleHttp\Message\ResponseInterface;
008 use GuzzleHttp\Ring\Core;
009 use GuzzleHttp\Ring\Future\FutureInterface;
010 use GuzzleHttp\Event\ListenerAttacherTrait;
011 use GuzzleHttp\Event\EndEvent;
012 use React\Promise\Deferred;
013 use React\Promise\FulfilledPromise;
014 use React\Promise\PromiseInterface;
015 use React\Promise\RejectedPromise;
016
017 /**
018 * Sends and iterator of requests concurrently using a capped pool size.
019 *
020 * The Pool object implements FutureInterface, meaning it can be used later
021 * when necessary, the requests provided to the pool can be cancelled, and
022 * you can check the state of the pool to know if it has been dereferenced
023 * (sent) or has been cancelled.
024 *
025 * When sending the pool, keep in mind that no results are returned: callers
026 * are expected to handle results asynchronously using Guzzle's event system.
027 * When requests complete, more are added to the pool to ensure that the
028 * requested pool size is always filled as much as possible.
029 *
030 * IMPORTANT: Do not provide a pool size greater that what the utilized
031 * underlying RingPHP handler can support. This will result is extremely poor
032 * performance.
033 */
034 class Pool implements FutureInterface
035 {
036 use ListenerAttacherTrait;
037
038 /** @var \GuzzleHttp\ClientInterface */
039 private $client;
040
041 /** @var \Iterator Yields requests */
042 private $iter;
043
044 /** @var Deferred */
045 private $deferred;
046
047 /** @var PromiseInterface */
048 private $promise;
049
050 private $waitQueue = [];
051 private $eventListeners = [];
052 private $poolSize;
053 private $isRealized = false;
054
055 /**
056 * The option values for 'before', 'complete', 'error' and 'end' can be a
057 * callable, an associative array containing event data, or an array of
058 * event data arrays. Event data arrays contain the following keys:
059 *
060 * - fn: callable to invoke that receives the event
061 * - priority: Optional event priority (defaults to 0)
062 * - once: Set to true so that the event is removed after it is triggered
063 *
064 * @param ClientInterface $client Client used to send the requests.
065 * @param array|\Iterator $requests Requests to send in parallel
066 * @param array $options Associative array of options
067 * - pool_size: (callable|int) Maximum number of requests to send
068 * concurrently, or a callback that receives
069 * the current queue size and returns the
070 * number of new requests to send
071 * - before: (callable|array) Receives a BeforeEvent
072 * - complete: (callable|array) Receives a CompleteEvent
073 * - error: (callable|array) Receives a ErrorEvent
074 * - end: (callable|array) Receives an EndEvent
075 */
076 public function __construct(
077 ClientInterface $client,
078 $requests,
079 array $options = []
080 ) {
081 $this->client = $client;
082 $this->iter = $this->coerceIterable($requests);
083 $this->deferred = new Deferred();
084 $this->promise = $this->deferred->promise();
085 $this->poolSize = isset($options['pool_size'])
086 ? $options['pool_size'] : 25;
087 $this->eventListeners = $this->prepareListeners(
088 $options,
089 ['before', 'complete', 'error', 'end']
090 );
091 }
092
093 /**
094 * Sends multiple requests in parallel and returns an array of responses
095 * and exceptions that uses the same ordering as the provided requests.
096 *
097 * IMPORTANT: This method keeps every request and response in memory, and
098 * as such, is NOT recommended when sending a large number or an
099 * indeterminate number of requests concurrently.
100 *
101 * @param ClientInterface $client Client used to send the requests
102 * @param array|\Iterator $requests Requests to send in parallel
103 * @param array $options Passes through the options available in
104 * {@see GuzzleHttp\Pool::__construct}
105 *
106 * @return BatchResults Returns a container for the results.
107 * @throws \InvalidArgumentException if the event format is incorrect.
108 */
109 public static function batch(
110 ClientInterface $client,
111 $requests,
112 array $options = []
113 ) {
114 $hash = new \SplObjectStorage();
115 foreach ($requests as $request) {
116 $hash->attach($request);
117 }
118
119 // In addition to the normally run events when requests complete, add
120 // and event to continuously track the results of transfers in the hash.
121 (new self($client, $requests, RequestEvents::convertEventArray(
122 $options,
123 ['end'],
124 [
125 'priority' => RequestEvents::LATE,
126 'fn' => function (EndEvent $e) use ($hash) {
127 $hash[$e->getRequest()] = $e->getException()
128 ? $e->getException()
129 : $e->getResponse();
130 }
131 ]
132 )))->wait();
133
134 return new BatchResults($hash);
135 }
136
137 /**
138 * Creates a Pool and immediately sends the requests.
139 *
140 * @param ClientInterface $client Client used to send the requests
141 * @param array|\Iterator $requests Requests to send in parallel
142 * @param array $options Passes through the options available in
143 * {@see GuzzleHttp\Pool::__construct}
144 */
145 public static function send(
146 ClientInterface $client,
147 $requests,
148 array $options = []
149 ) {
150 $pool = new self($client, $requests, $options);
151 $pool->wait();
152 }
153
154 private function getPoolSize()
155 {
156 return is_callable($this->poolSize)
157 ? call_user_func($this->poolSize, count($this->waitQueue))
158 : $this->poolSize;
159 }
160
161 /**
162 * Add as many requests as possible up to the current pool limit.
163 */
164 private function addNextRequests()
165 {
166 $limit = max($this->getPoolSize() - count($this->waitQueue), 0);
167 while ($limit--) {
168 if (!$this->addNextRequest()) {
169 break;
170 }
171 }
172 }
173
174 public function wait()
175 {
176 if ($this->isRealized) {
177 return false;
178 }
179
180 // Seed the pool with N number of requests.
181 $this->addNextRequests();
182
183 // Stop if the pool was cancelled while transferring requests.
184 if ($this->isRealized) {
185 return false;
186 }
187
188 // Wait on any outstanding FutureResponse objects.
189 while ($response = array_pop($this->waitQueue)) {
190 try {
191 $response->wait();
192 } catch (\Exception $e) {
193 // Eat exceptions because they should be handled asynchronously
194 }
195 $this->addNextRequests();
196 }
197
198 // Clean up no longer needed state.
199 $this->isRealized = true;
200 $this->waitQueue = $this->eventListeners = [];
201 $this->client = $this->iter = null;
202 $this->deferred->resolve(true);
203
204 return true;
205 }
206
207 /**
208 * {@inheritdoc}
209 *
210 * Attempt to cancel all outstanding requests (requests that are queued for
211 * dereferencing). Returns true if all outstanding requests can be
212 * cancelled.
213 *
214 * @return bool
215 */
216 public function cancel()
217 {
218 if ($this->isRealized) {
219 return false;
220 }
221
222 $success = $this->isRealized = true;
223 foreach ($this->waitQueue as $response) {
224 if (!$response->cancel()) {
225 $success = false;
226 }
227 }
228
229 return $success;
230 }
231
232 /**
233 * Returns a promise that is invoked when the pool completed. There will be
234 * no passed value.
235 *
236 * {@inheritdoc}
237 */
238 public function then(
239 callable $onFulfilled = null,
240 callable $onRejected = null,
241 callable $onProgress = null
242 ) {
243 return $this->promise->then($onFulfilled, $onRejected, $onProgress);
244 }
245
246 public function promise()
247 {
248 return $this->promise;
249 }
250
251 private function coerceIterable($requests)
252 {
253 if ($requests instanceof \Iterator) {
254 return $requests;
255 } elseif (is_array($requests)) {
256 return new \ArrayIterator($requests);
257 }
258
259 throw new \InvalidArgumentException('Expected Iterator or array. '
260 . 'Found ' . Core::describeType($requests));
261 }
262
263 /**
264 * Adds the next request to pool and tracks what requests need to be
265 * dereferenced when completing the pool.
266 */
267 private function addNextRequest()
268 {
269 add_next:
270
271 if ($this->isRealized || !$this->iter || !$this->iter->valid()) {
272 return false;
273 }
274
275 $request = $this->iter->current();
276 $this->iter->next();
277
278 if (!($request instanceof RequestInterface)) {
279 throw new \InvalidArgumentException(sprintf(
280 'All requests in the provided iterator must implement '
281 . 'RequestInterface. Found %s',
282 Core::describeType($request)
283 ));
284 }
285
286 // Be sure to use "lazy" futures, meaning they do not send right away.
287 $request->getConfig()->set('future', 'lazy');
288 $hash = spl_object_hash($request);
289 $this->attachListeners($request, $this->eventListeners);
290 $request->getEmitter()->on('before', [$this, '_trackRetries'], RequestEvents::EARLY);
291 $response = $this->client->send($request);
292 $this->waitQueue[$hash] = $response;
293 $promise = $response->promise();
294
295 // Don't recursively call itself for completed or rejected responses.
296 if ($promise instanceof FulfilledPromise
297 || $promise instanceof RejectedPromise
298 ) {
299 try {
300 $this->finishResponse($request, $response->wait(), $hash);
301 } catch (\Exception $e) {
302 $this->finishResponse($request, $e, $hash);
303 }
304 goto add_next;
305 }
306
307 // Use this function for both resolution and rejection.
308 $thenFn = function ($value) use ($request, $hash) {
309 $this->finishResponse($request, $value, $hash);
310 if (!$request->getConfig()->get('_pool_retries')) {
311 $this->addNextRequests();
312 }
313 };
314
315 $promise->then($thenFn, $thenFn);
316
317 return true;
318 }
319
320 public function _trackRetries(BeforeEvent $e)
321 {
322 $e->getRequest()->getConfig()->set('_pool_retries', $e->getRetryCount());
323 }
324
325 private function finishResponse($request, $value, $hash)
326 {
327 unset($this->waitQueue[$hash]);
328 $result = $value instanceof ResponseInterface
329 ? ['request' => $request, 'response' => $value, 'error' => null]
330 : ['request' => $request, 'response' => null, 'error' => $value];
331 $this->deferred->notify($result);
332 }
333 }
334