Hoa central
Stream.php
Go to the documentation of this file.
1 <?php
2 
37 namespace Hoa\Stream;
38 
39 use Hoa\Core;
40 
49 abstract class Stream implements Core\Event\Listenable
50 {
56  const NAME = 0;
57 
63  const HANDLER = 1;
64 
70  const RESOURCE = 2;
71 
77  const CONTEXT = 3;
78 
84  protected $_bucket = [];
85 
91  private static $_register = [];
92 
98  protected $_bufferSize = 8192;
99 
105  protected $_streamName = null;
106 
112  protected $_context = null;
113 
119  protected $_hasBeenDiffered = false;
120 
126  protected $_on = null;
127 
133  protected $_borrowed = false;
134 
135 
136 
148  public function __construct($streamName, $context = null, $wait = false)
149  {
150  $this->_streamName = $streamName;
151  $this->_context = $context;
152  $this->_hasBeenDiffered = $wait;
153  $this->_on = new Core\Event\Listener($this, [
154  'authrequire',
155  'authresult',
156  'complete',
157  'connect',
158  'failure',
159  'mimetype',
160  'progress',
161  'redirect',
162  'resolve',
163  'size'
164  ]);
165 
166  if (true === $wait) {
167  return;
168  }
169 
170  $this->open();
171 
172  return;
173  }
174 
187  final private static function &_getStream(
188  $streamName,
189  Stream $handler,
190  $context = null
191  ) {
192  $name = md5($streamName);
193 
194  if (null !== $context) {
195  if (false === Context::contextExists($context)) {
196  throw new Exception(
197  'Context %s was not previously declared, cannot retrieve ' .
198  'this context.',
199  0,
200  $context
201  );
202  }
203 
204  $context = Context::getInstance($context);
205  }
206 
207  if (!isset(self::$_register[$name])) {
208  self::$_register[$name] = [
209  self::NAME => $streamName,
210  self::HANDLER => $handler,
211  self::RESOURCE => $handler->_open($streamName, $context),
212  self::CONTEXT => $context
213  ];
214  Core\Event::register(
215  'hoa://Event/Stream/' . $streamName,
216  $handler
217  );
218  // Add :open-ready?
219  Core\Event::register(
220  'hoa://Event/Stream/' . $streamName . ':close-before',
221  $handler
222  );
223  } else {
224  $handler->_borrowed = true;
225  }
226 
227  if (null === self::$_register[$name][self::RESOURCE]) {
228  self::$_register[$name][self::RESOURCE]
229  = $handler->_open($streamName, $context);
230  }
231 
232  return self::$_register[$name];
233  }
234 
245  abstract protected function &_open($streamName, Context $context = null);
246 
254  abstract protected function _close();
255 
262  final public function open()
263  {
264  $context = $this->_context;
265 
266  if (true === $this->_hasBeenDiffered) {
267  if (null === $context) {
268  $handle = Context::getInstance(uniqid());
269  $handle->setParameters([
270  'notification' => [$this, '_notify']
271  ]);
272  $context = $handle->getId();
273  } elseif (true === Context::contextExists($context)) {
274  $handle = Context::getInstance($context);
275  $parameters = $handle->getParameters();
276 
277  if (!isset($parameters['notification'])) {
278  $handle->setParameters([
279  'notification' => [$this, '_notify']
280  ]);
281  }
282  }
283  }
284 
285  $this->_bucket = self::_getStream(
286  $this->_streamName,
287  $this,
288  $context
289  );
290 
291  return $this;
292  }
293 
299  final public function close()
300  {
301  $streamName = $this->getStreamName();
302  $name = md5($streamName);
303 
304  if (!isset(self::$_register[$name])) {
305  return;
306  }
307 
308  Core\Event::notify(
309  'hoa://Event/Stream/' . $streamName . ':close-before',
310  $this,
311  new Core\Event\Bucket()
312  );
313 
314  if (false === $this->_close()) {
315  return;
316  }
317 
318  unset(self::$_register[$name]);
319  $this->_bucket[self::HANDLER] = null;
320  unset($this->_on);
321  Core\Event::unregister(
322  'hoa://Event/Stream/' . $streamName
323  );
324  Core\Event::unregister(
325  'hoa://Event/Stream/' . $streamName . ':close-before'
326  );
327 
328  return;
329  }
330 
336  public function getStreamName()
337  {
338  if (empty($this->_bucket)) {
339  return null;
340  }
341 
342  return $this->_bucket[self::NAME];
343  }
344 
350  protected function getStream()
351  {
352  if (empty($this->_bucket)) {
353  return null;
354  }
355 
356  return $this->_bucket[self::RESOURCE];
357  }
358 
364  public function getStreamContext()
365  {
366  if (empty($this->_bucket)) {
367  return null;
368  }
369 
370  return $this->_bucket[self::CONTEXT];
371  }
372 
379  public static function getStreamHandler($streamName)
380  {
381  $name = md5($streamName);
382 
383  if (!isset(self::$_register[$name])) {
384  return null;
385  }
386 
387  return self::$_register[$name][self::HANDLER];
388  }
389 
399  public function _setStream($stream)
400  {
401  if (false === is_resource($stream) &&
402  ('resource' === gettype($stream) &&
403  'Unknown' !== get_resource_type($stream))) {
404  throw new Exception(
405  'Try to change the stream resource with an invalid one; ' .
406  'given %s.',
407  1,
408  gettype($stream)
409  );
410  }
411 
412  $old = $this->_bucket[self::RESOURCE];
413  $this->_bucket[self::RESOURCE] = $stream;
414 
415  return $old;
416  }
417 
423  public function isOpened()
424  {
425  return is_resource($this->getStream());
426  }
427 
435  public function setStreamTimeout($seconds, $microseconds = 0)
436  {
437  return stream_set_timeout($this->getStream(), $seconds, $microseconds);
438  }
439 
446  public function setStreamBlocking($mode)
447  {
448  return stream_set_blocking($this->getStream(), (int) $mode);
449  }
450 
465  public function setStreamBuffer($buffer)
466  {
467  // Zero means success.
468  $out = 0 === stream_set_write_buffer($this->getStream(), $buffer);
469 
470  if (true === $out) {
471  $this->_bufferSize = $buffer;
472  }
473 
474  return $out;
475  }
476 
483  public function disableStreamBuffer()
484  {
485  return $this->setStreamBuffer(0);
486  }
487 
493  public function getStreamBufferSize()
494  {
495  return $this->_bufferSize;
496  }
497 
503  public function getStreamWrapperName()
504  {
505  if (false === $pos = strpos($this->getStreamName(), '://')) {
506  return 'file';
507  }
508 
509  return substr($this->getStreamName(), 0, $pos);
510  }
511 
517  public function getStreamMetaData()
518  {
519  return stream_get_meta_data($this->getStream());
520  }
521 
527  public function isBorrowing()
528  {
529  return $this->_borrowed;
530  }
531 
540  public function on($listenerId, $callable)
541  {
542  $this->_on->attach($listenerId, $callable);
543 
544  return $this;
545  }
546 
561  public function _notify(
562  $ncode,
563  $severity,
564  $message,
565  $code,
566  $transferred,
567  $max
568  ) {
569  static $_map = [
570  STREAM_NOTIFY_AUTH_REQUIRED => 'authrequire',
571  STREAM_NOTIFY_AUTH_RESULT => 'authresult',
572  STREAM_NOTIFY_COMPLETED => 'complete',
573  STREAM_NOTIFY_CONNECT => 'connect',
574  STREAM_NOTIFY_FAILURE => 'failure',
575  STREAM_NOTIFY_MIME_TYPE_IS => 'mimetype',
576  STREAM_NOTIFY_PROGRESS => 'progress',
577  STREAM_NOTIFY_REDIRECTED => 'redirect',
578  STREAM_NOTIFY_RESOLVE => 'resolve',
579  STREAM_NOTIFY_FILE_SIZE_IS => 'size'
580  ];
581 
582  $this->_on->fire($_map[$ncode], new Core\Event\Bucket([
583  'code' => $code,
584  'severity' => $severity,
585  'message' => $message,
586  'code' => $code,
587  'transferred' => $transferred,
588  'max' => $max
589  ]));
590 
591  return;
592  }
593 
603  final public static function _Hoa_Stream()
604  {
605  foreach (self::$_register as $entry) {
606  $entry[self::HANDLER]->close();
607  }
608 
609  return;
610  }
611 
617  public function __toString()
618  {
619  return $this->getStreamName();
620  }
621 
627  public function __destruct()
628  {
629  $this->close();
630 
631  return;
632  }
633 }
634 
643 class _Protocol extends Core\Protocol
644 {
650  protected $_name = 'Stream';
651 
652 
653 
660  public function reachId($id)
661  {
662  return Stream::getStreamHandler($id);
663  }
664 }
665 
669 Core\Consistency::flexEntity('Hoa\Stream\Stream');
670 
674 Core::registerShutdownFunction('\Hoa\Stream\Stream', '_Hoa_Stream');
675 
680 $protocol = Core::getInstance()->getProtocol();
681 $protocol['Library'][] = new _Protocol();
const RESOURCE
Definition: Stream.php:70
_notify($ncode, $severity, $message, $code, $transferred, $max)
Definition: Stream.php:561
setStreamTimeout($seconds, $microseconds=0)
Definition: Stream.php:435
static getStreamHandler($streamName)
Definition: Stream.php:379
static contextExists($id)
Definition: Context.php:114
& _open($streamName, Context $context=null)
setStreamBlocking($mode)
Definition: Stream.php:446
static & _getStream($streamName, Stream $handler, $context=null)
Definition: Stream.php:187
_setStream($stream)
Definition: Stream.php:399
static getInstance()
Definition: Core.php:193
static _Hoa_Stream()
Definition: Stream.php:603
__construct($streamName, $context=null, $wait=false)
Definition: Stream.php:148
static $_register
Definition: Stream.php:91
setStreamBuffer($buffer)
Definition: Stream.php:465
static registerShutdownFunction($class= '', $method= '')
Definition: Core.php:404
on($listenerId, $callable)
Definition: Stream.php:540
static getInstance($id)
Definition: Context.php:85