Verzeichnisstruktur phpBB-3.2.0


Veröffentlicht
06.01.2017

So funktioniert es


Auf das letzte Element klicken. Dies geht jeweils ein Schritt zurück

Auf das Icon klicken, dies öffnet das Verzeichnis. Nochmal klicken schließt das Verzeichnis.
Auf den Verzeichnisnamen klicken, dies zeigt nur das Verzeichnis mit Inhalt an

(Beispiel Datei-Icons)

Auf das Icon klicken um den Quellcode anzuzeigen

Pool.php

Zuletzt modifiziert: 09.10.2024, 12:56 - Dateigröße: 10.87 KiB


001  <?php
002  namespace GuzzleHttp;
003   
004  use GuzzleHttp\Event\BeforeEvent;
005  use GuzzleHttp\Event\RequestEvents;
006  use GuzzleHttp\Message\RequestInterface;
007  use GuzzleHttp\Message\ResponseInterface;
008  use GuzzleHttp\Ring\Core;
009  use GuzzleHttp\Ring\Future\FutureInterface;
010  use GuzzleHttp\Event\ListenerAttacherTrait;
011  use GuzzleHttp\Event\EndEvent;
012  use React\Promise\Deferred;
013  use React\Promise\FulfilledPromise;
014  use React\Promise\PromiseInterface;
015  use React\Promise\RejectedPromise;
016   
017  /**
018   * Sends and iterator of requests concurrently using a capped pool size.
019   *
020   * The Pool object implements FutureInterface, meaning it can be used later
021   * when necessary, the requests provided to the pool can be cancelled, and
022   * you can check the state of the pool to know if it has been dereferenced
023   * (sent) or has been cancelled.
024   *
025   * When sending the pool, keep in mind that no results are returned: callers
026   * are expected to handle results asynchronously using Guzzle's event system.
027   * When requests complete, more are added to the pool to ensure that the
028   * requested pool size is always filled as much as possible.
029   *
030   * IMPORTANT: Do not provide a pool size greater that what the utilized
031   * underlying RingPHP handler can support. This will result is extremely poor
032   * performance.
033   */
034  class Pool implements FutureInterface
035  {
036      use ListenerAttacherTrait;
037   
038      /** @var \GuzzleHttp\ClientInterface */
039      private $client;
040   
041      /** @var \Iterator Yields requests */
042      private $iter;
043   
044      /** @var Deferred */
045      private $deferred;
046   
047      /** @var PromiseInterface */
048      private $promise;
049   
050      private $waitQueue = [];
051      private $eventListeners = [];
052      private $poolSize;
053      private $isRealized = false;
054   
055      /**
056       * The option values for 'before', 'complete', 'error' and 'end' can be a
057       * callable, an associative array containing event data, or an array of
058       * event data arrays. Event data arrays contain the following keys:
059       *
060       * - fn: callable to invoke that receives the event
061       * - priority: Optional event priority (defaults to 0)
062       * - once: Set to true so that the event is removed after it is triggered
063       *
064       * @param ClientInterface $client   Client used to send the requests.
065       * @param array|\Iterator $requests Requests to send in parallel
066       * @param array           $options  Associative array of options
067       *     - pool_size: (callable|int)   Maximum number of requests to send
068       *                                   concurrently, or a callback that receives
069       *                                   the current queue size and returns the
070       *                                   number of new requests to send
071       *     - before:    (callable|array) Receives a BeforeEvent
072       *     - complete:  (callable|array) Receives a CompleteEvent
073       *     - error:     (callable|array) Receives a ErrorEvent
074       *     - end:       (callable|array) Receives an EndEvent
075       */
076      public function __construct(
077          ClientInterface $client,
078          $requests,
079          array $options = []
080      ) {
081          $this->client = $client;
082          $this->iter = $this->coerceIterable($requests);
083          $this->deferred = new Deferred();
084          $this->promise = $this->deferred->promise();
085          $this->poolSize = isset($options['pool_size'])
086              ? $options['pool_size'] : 25;
087          $this->eventListeners = $this->prepareListeners(
088              $options,
089              ['before', 'complete', 'error', 'end']
090          );
091      }
092   
093      /**
094       * Sends multiple requests in parallel and returns an array of responses
095       * and exceptions that uses the same ordering as the provided requests.
096       *
097       * IMPORTANT: This method keeps every request and response in memory, and
098       * as such, is NOT recommended when sending a large number or an
099       * indeterminate number of requests concurrently.
100       *
101       * @param ClientInterface $client   Client used to send the requests
102       * @param array|\Iterator $requests Requests to send in parallel
103       * @param array           $options  Passes through the options available in
104       *                                  {@see GuzzleHttp\Pool::__construct}
105       *
106       * @return BatchResults Returns a container for the results.
107       * @throws \InvalidArgumentException if the event format is incorrect.
108       */
109      public static function batch(
110          ClientInterface $client,
111          $requests,
112          array $options = []
113      ) {
114          $hash = new \SplObjectStorage();
115          foreach ($requests as $request) {
116              $hash->attach($request);
117          }
118   
119          // In addition to the normally run events when requests complete, add
120          // and event to continuously track the results of transfers in the hash.
121          (new self($client, $requests, RequestEvents::convertEventArray(
122              $options,
123              ['end'],
124              [
125                  'priority' => RequestEvents::LATE,
126                  'fn'       => function (EndEvent $e) use ($hash) {
127                      $hash[$e->getRequest()] = $e->getException()
128                          ? $e->getException()
129                          : $e->getResponse();
130                  }
131              ]
132          )))->wait();
133   
134          return new BatchResults($hash);
135      }
136   
137      /**
138       * Creates a Pool and immediately sends the requests.
139       *
140       * @param ClientInterface $client   Client used to send the requests
141       * @param array|\Iterator $requests Requests to send in parallel
142       * @param array           $options  Passes through the options available in
143       *                                  {@see GuzzleHttp\Pool::__construct}
144       */
145      public static function send(
146          ClientInterface $client,
147          $requests,
148          array $options = []
149      ) {
150          $pool = new self($client, $requests, $options);
151          $pool->wait();
152      }
153   
154      private function getPoolSize()
155      {
156          return is_callable($this->poolSize)
157              ? call_user_func($this->poolSize, count($this->waitQueue))
158              : $this->poolSize;
159      }
160   
161      /**
162       * Add as many requests as possible up to the current pool limit.
163       */
164      private function addNextRequests()
165      {
166          $limit = max($this->getPoolSize() - count($this->waitQueue), 0);
167          while ($limit--) {
168              if (!$this->addNextRequest()) {
169                  break;
170              }
171          }
172      }
173   
174      public function wait()
175      {
176          if ($this->isRealized) {
177              return false;
178          }
179   
180          // Seed the pool with N number of requests.
181          $this->addNextRequests();
182   
183          // Stop if the pool was cancelled while transferring requests.
184          if ($this->isRealized) {
185              return false;
186          }
187   
188          // Wait on any outstanding FutureResponse objects.
189          while ($response = array_pop($this->waitQueue)) {
190              try {
191                  $response->wait();
192              } catch (\Exception $e) {
193                  // Eat exceptions because they should be handled asynchronously
194              }
195              $this->addNextRequests();
196          }
197   
198          // Clean up no longer needed state.
199          $this->isRealized = true;
200          $this->waitQueue = $this->eventListeners = [];
201          $this->client = $this->iter = null;
202          $this->deferred->resolve(true);
203   
204          return true;
205      }
206   
207      /**
208       * {@inheritdoc}
209       *
210       * Attempt to cancel all outstanding requests (requests that are queued for
211       * dereferencing). Returns true if all outstanding requests can be
212       * cancelled.
213       *
214       * @return bool
215       */
216      public function cancel()
217      {
218          if ($this->isRealized) {
219              return false;
220          }
221   
222          $success = $this->isRealized = true;
223          foreach ($this->waitQueue as $response) {
224              if (!$response->cancel()) {
225                  $success = false;
226              }
227          }
228   
229          return $success;
230      }
231   
232      /**
233       * Returns a promise that is invoked when the pool completed. There will be
234       * no passed value.
235       *
236       * {@inheritdoc}
237       */
238      public function then(
239          callable $onFulfilled = null,
240          callable $onRejected = null,
241          callable $onProgress = null
242      ) {
243          return $this->promise->then($onFulfilled, $onRejected, $onProgress);
244      }
245   
246      public function promise()
247      {
248          return $this->promise;
249      }
250   
251      private function coerceIterable($requests)
252      {
253          if ($requests instanceof \Iterator) {
254              return $requests;
255          } elseif (is_array($requests)) {
256              return new \ArrayIterator($requests);
257          }
258   
259          throw new \InvalidArgumentException('Expected Iterator or array. '
260              . 'Found ' . Core::describeType($requests));
261      }
262   
263      /**
264       * Adds the next request to pool and tracks what requests need to be
265       * dereferenced when completing the pool.
266       */
267      private function addNextRequest()
268      {
269          add_next:
270   
271          if ($this->isRealized || !$this->iter || !$this->iter->valid()) {
272              return false;
273          }
274   
275          $request = $this->iter->current();
276          $this->iter->next();
277   
278          if (!($request instanceof RequestInterface)) {
279              throw new \InvalidArgumentException(sprintf(
280                  'All requests in the provided iterator must implement '
281                  . 'RequestInterface. Found %s',
282                  Core::describeType($request)
283              ));
284          }
285   
286          // Be sure to use "lazy" futures, meaning they do not send right away.
287          $request->getConfig()->set('future', 'lazy');
288          $hash = spl_object_hash($request);
289          $this->attachListeners($request, $this->eventListeners);
290          $request->getEmitter()->on('before', [$this, '_trackRetries'], RequestEvents::EARLY);
291          $response = $this->client->send($request);
292          $this->waitQueue[$hash] = $response;
293          $promise = $response->promise();
294   
295          // Don't recursively call itself for completed or rejected responses.
296          if ($promise instanceof FulfilledPromise
297              || $promise instanceof RejectedPromise
298          ) {
299              try {
300                  $this->finishResponse($request, $response->wait(), $hash);
301              } catch (\Exception $e) {
302                  $this->finishResponse($request, $e, $hash);
303              }
304              goto add_next;
305          }
306   
307          // Use this function for both resolution and rejection.
308          $thenFn = function ($value) use ($request, $hash) {
309              $this->finishResponse($request, $value, $hash);
310              if (!$request->getConfig()->get('_pool_retries')) {
311                  $this->addNextRequests();
312              }
313          };
314   
315          $promise->then($thenFn, $thenFn);
316   
317          return true;
318      }
319   
320      public function _trackRetries(BeforeEvent $e)
321      {
322          $e->getRequest()->getConfig()->set('_pool_retries', $e->getRetryCount());
323      }
324   
325      private function finishResponse($request, $value, $hash)
326      {
327          unset($this->waitQueue[$hash]);
328          $result = $value instanceof ResponseInterface
329              ? ['request' => $request, 'response' => $value, 'error' => null]
330              : ['request' => $request, 'response' => null, 'error' => $value];
331          $this->deferred->notify($result);
332      }
333  }
334