Verzeichnisstruktur phpBB-3.2.0


Veröffentlicht
06.01.2017

So funktioniert es


Auf das letzte Element klicken. Dies geht jeweils ein Schritt zurück

Auf das Icon klicken, dies öffnet das Verzeichnis. Nochmal klicken schließt das Verzeichnis.
Auf den Verzeichnisnamen klicken, dies zeigt nur das Verzeichnis mit Inhalt an

(Beispiel Datei-Icons)

Auf das Icon klicken um den Quellcode anzuzeigen

AsyncReadStream.php

Zuletzt modifiziert: 09.10.2024, 12:56 - Dateigröße: 7.40 KiB


001  <?php
002  namespace GuzzleHttp\Stream;
003   
004  /**
005   * Represents an asynchronous read-only stream that supports a drain event and
006   * pumping data from a source stream.
007   *
008   * The AsyncReadStream can be used as a completely asynchronous stream, meaning
009   * the data you can read from the stream will immediately return only
010   * the data that is currently buffered.
011   *
012   * AsyncReadStream can also be used in a "blocking" manner if a "pump" function
013   * is provided. When a caller requests more bytes than are available in the
014   * buffer, then the pump function is used to block until the requested number
015   * of bytes are available or the remote source stream has errored, closed, or
016   * timed-out. This behavior isn't strictly "blocking" because the pump function
017   * can send other transfers while waiting on the desired buffer size to be
018   * ready for reading (e.g., continue to tick an event loop).
019   *
020   * @unstable This class is subject to change.
021   */
022  class AsyncReadStream implements StreamInterface
023  {
024      use StreamDecoratorTrait;
025   
026      /** @var callable|null Fn used to notify writers the buffer has drained */
027      private $drain;
028   
029      /** @var callable|null Fn used to block for more data */
030      private $pump;
031   
032      /** @var int|null Highwater mark of the underlying buffer */
033      private $hwm;
034   
035      /** @var bool Whether or not drain needs to be called at some point */
036      private $needsDrain;
037   
038      /** @var int The expected size of the remote source */
039      private $size;
040   
041      /**
042       * In order to utilize high water marks to tell writers to slow down, the
043       * provided stream must answer to the "hwm" stream metadata variable,
044       * providing the high water mark. If no "hwm" metadata value is available,
045       * then the "drain" functionality is not utilized.
046       *
047       * This class accepts an associative array of configuration options.
048       *
049       * - drain: (callable) Function to invoke when the stream has drained,
050       *   meaning the buffer is now writable again because the size of the
051       *   buffer is at an acceptable level (e.g., below the high water mark).
052       *   The function accepts a single argument, the buffer stream object that
053       *   has drained.
054       * - pump: (callable) A function that accepts the number of bytes to read
055       *   from the source stream. This function will block until all of the data
056       *   that was requested has been read, EOF of the source stream, or the
057       *   source stream is closed.
058       * - size: (int) The expected size in bytes of the data that will be read
059       *   (if known up-front).
060       *
061       * @param StreamInterface $buffer   Buffer that contains the data that has
062       *                                  been read by the event loop.
063       * @param array           $config   Associative array of options.
064       *
065       * @throws \InvalidArgumentException if the buffer is not readable and
066       *                                   writable.
067       */
068      public function __construct(
069          StreamInterface $buffer,
070          array $config = []
071      ) {
072          if (!$buffer->isReadable() || !$buffer->isWritable()) {
073              throw new \InvalidArgumentException(
074                  'Buffer must be readable and writable'
075              );
076          }
077   
078          if (isset($config['size'])) {
079              $this->size = $config['size'];
080          }
081   
082          static $callables = ['pump', 'drain'];
083          foreach ($callables as $check) {
084              if (isset($config[$check])) {
085                  if (!is_callable($config[$check])) {
086                      throw new \InvalidArgumentException(
087                          $check . ' must be callable'
088                      );
089                  }
090                  $this->{$check} = $config[$check];
091              }
092          }
093   
094          $this->hwm = $buffer->getMetadata('hwm');
095   
096          // Cannot drain when there's no high water mark.
097          if ($this->hwm === null) {
098              $this->drain = null;
099          }
100   
101          $this->stream = $buffer;
102      }
103   
104      /**
105       * Factory method used to create new async stream and an underlying buffer
106       * if no buffer is provided.
107       *
108       * This function accepts the same options as AsyncReadStream::__construct,
109       * but added the following key value pairs:
110       *
111       * - buffer: (StreamInterface) Buffer used to buffer data. If none is
112       *   provided, a default buffer is created.
113       * - hwm: (int) High water mark to use if a buffer is created on your
114       *   behalf.
115       * - max_buffer: (int) If provided, wraps the utilized buffer in a
116       *   DroppingStream decorator to ensure that buffer does not exceed a given
117       *   length. When exceeded, the stream will begin dropping data. Set the
118       *   max_buffer to 0, to use a NullStream which does not store data.
119       * - write: (callable) A function that is invoked when data is written
120       *   to the underlying buffer. The function accepts the buffer as the first
121       *   argument, and the data being written as the second. The function MUST
122       *   return the number of bytes that were written or false to let writers
123       *   know to slow down.
124       * - drain: (callable) See constructor documentation.
125       * - pump: (callable) See constructor documentation.
126       *
127       * @param array $options Associative array of options.
128       *
129       * @return array Returns an array containing the buffer used to buffer
130       *               data, followed by the ready to use AsyncReadStream object.
131       */
132      public static function create(array $options = [])
133      {
134          $maxBuffer = isset($options['max_buffer'])
135              ? $options['max_buffer']
136              : null;
137   
138          if ($maxBuffer === 0) {
139              $buffer = new NullStream();
140          } elseif (isset($options['buffer'])) {
141              $buffer = $options['buffer'];
142          } else {
143              $hwm = isset($options['hwm']) ? $options['hwm'] : 16384;
144              $buffer = new BufferStream($hwm);
145          }
146   
147          if ($maxBuffer > 0) {
148              $buffer = new DroppingStream($buffer, $options['max_buffer']);
149          }
150   
151          // Call the on_write callback if an on_write function was provided.
152          if (isset($options['write'])) {
153              $onWrite = $options['write'];
154              $buffer = FnStream::decorate($buffer, [
155                  'write' => function ($string) use ($buffer, $onWrite) {
156                      $result = $buffer->write($string);
157                      $onWrite($buffer, $string);
158                      return $result;
159                  }
160              ]);
161          }
162   
163          return [$buffer, new self($buffer, $options)];
164      }
165   
166      public function getSize()
167      {
168          return $this->size;
169      }
170   
171      public function isWritable()
172      {
173          return false;
174      }
175   
176      public function write($string)
177      {
178          return false;
179      }
180   
181      public function read($length)
182      {
183          if (!$this->needsDrain && $this->drain) {
184              $this->needsDrain = $this->stream->getSize() >= $this->hwm;
185          }
186   
187          $result = $this->stream->read($length);
188   
189          // If we need to drain, then drain when the buffer is empty.
190          if ($this->needsDrain && $this->stream->getSize() === 0) {
191              $this->needsDrain = false;
192              $drainFn = $this->drain;
193              $drainFn($this->stream);
194          }
195   
196          $resultLen = strlen($result);
197   
198          // If a pump was provided, the buffer is still open, and not enough
199          // data was given, then block until the data is provided.
200          if ($this->pump && $resultLen < $length) {
201              $pumpFn = $this->pump;
202              $result .= $pumpFn($length - $resultLen);
203          }
204   
205          return $result;
206      }
207  }
208