Hoa central
Shared.php
Go to the documentation of this file.
1 <?php
2 
37 namespace Hoa\Worker\Backend;
38 
39 use Hoa\Core;
40 use Hoa\Fastcgi;
41 use Hoa\Socket;
42 use Hoa\Worker;
43 use Hoa\Zombie;
44 
73 class Shared implements Core\Event\Listenable
74 {
80  const TYPE_STOP = 0;
81 
87  const TYPE_MESSAGE = 1;
88 
94  const TYPE_INFORMATIONS = 2;
95 
101  protected $_socket = null;
102 
108  protected $_wid = null;
109 
115  protected $_on = null;
116 
122  protected $_password = null;
123 
129  protected $_startTime = 0;
130 
136  protected $_messages = 0;
137 
143  protected $_lastMessage = 0;
144 
145 
146 
157  public function __construct($workerId, $password)
158  {
159  if (!is_string($workerId) &&
160  !($workerId instanceof Socket\Client)) {
161  throw new Exception(
162  'Either you give a worker ID or you give an object of type ' .
163  '\Hoa\Socket\Client, but not anything else; given %s',
164  0,
165  is_object($workerId)
166  ? get_class($workerId)
167  : $workerId
168  );
169  }
170 
171  if (is_string($workerId)) {
172  $this->_wid = $workerId;
173  $handle = Worker\Run::get($workerId);
174  $workerId = $handle['socket'];
175  }
176 
177  set_time_limit(0);
178 
179  $this->_socket = $workerId;
180  $this->_on = new Core\Event\Listener($this, ['message']);
181  $this->_password = sha1($password);
182  $this->_startTime = microtime(true);
183 
184  return;
185  }
186 
195  public function on($listenerId, $callable)
196  {
197  $this->_on->attach($listenerId, $callable);
198 
199  return $this;
200  }
201 
209  public function run()
210  {
211  $server = new Socket\Server($this->_socket);
212  $server->connectAndWait();
213 
214  Zombie::fork();
215 
216  $_eom = pack('C', 0);
217 
218  while (true) {
219  foreach ($server->select() as $node) {
220  $request = unpack('nr', $server->read(2));
221  $length = unpack('Nl', $server->read(4));
222  $message = unserialize($server->read($length['l']));
223  $eom = unpack('Ce', $server->read(1));
224 
225  if ($eom['e'] != $_eom) {
226  $server->disconnect();
227 
228  continue;
229  }
230 
231  switch ($request['r']) {
232  case static::TYPE_MESSAGE:
233  $this->_on->fire(
234  'message',
235  new Core\Event\Bucket([
236  'message' => $message
237  ])
238  );
240  $this->_lastMessage = time();
241 
242  break;
243 
244  case static::TYPE_STOP:
245  if ($this->_password === $message) {
246  $server->disconnect();
247 
248  break 3;
249  }
250 
251  break;
252 
253  case static::TYPE_INFORMATIONS:
254  $message = [
255  'id' => $this->_wid,
256  'socket' => $this->_socket,
257  'start' => $this->_startTime,
258  'pid' => getmypid(),
259  'memory' => memory_get_usage(true),
260  'memory_allocated' => memory_get_usage(),
261  'memory_peak' => memory_get_peak_usage(true),
262  'memory_allocated_peak' => memory_get_usage(),
263  'messages' => $this->_messages,
264  'last_message' => $this->_lastMessage,
265  'filename' => $_SERVER['SCRIPT_FILENAME']
266  ];
267  $server->writeAll(
268  static::pack(static::TYPE_MESSAGE, $message)
269  );
270 
271  break;
272  }
273 
274  $server->disconnect();
275  }
276  }
277 
278  $server->disconnect();
279 
280  if (null !== $this->_wid) {
281  Worker\Run::unregister($this->_wid);
282  }
283 
284  return;
285  }
286 
295  public static function start($socket, $workerPath, Array $fastcgiParameters = [])
296  {
297  $server = new Fastcgi\Responder(
298  new Socket\Client($socket)
299  );
300 
301  $headers = [
302  'GATEWAY_INTERFACE' => 'FastCGI/1.0',
303  'SERVER_PROTOCOL' => 'HTTP/1.1',
304  'REQUEST_URI' => $workerPath,
305  'SCRIPT_FILENAME' => $workerPath,
306  'SCRIPT_NAME' => DS . dirname($workerPath)
307  ];
308 
309  $defaultFastcgiParameters = [
310  'REQUEST_METHOD' => 'GET'
311  ];
312 
313  return $server->send(
314  array_merge(
315  $defaultFastcgiParameters,
316  $fastcgiParameters,
317  $headers
318  )
319  );
320  }
321 
327  public function stop()
328  {
329  $client = new Socket\Client($this->_socket);
330  $client->connect();
331  $client->writeAll(static::pack(static::TYPE_STOP, $this->_password));
332  $client->disconnect();
333 
334  return true;
335  }
336 
344  public static function pack($type, $message)
345  {
346  $message = serialize($message);
347 
348  return
349  pack('n', $type) .
350  pack('N', strlen($message)) .
351  $message .
352  pack('C', 0);
353  }
354 }
static get($workerId)
Definition: Run.php:101
static unregister($workerId)
Definition: Run.php:85
on($listenerId, $callable)
Definition: Shared.php:195
__construct($workerId, $password)
Definition: Shared.php:157
static pack($type, $message)
Definition: Shared.php:344
static fork()
Definition: Zombie.php:66
static start($socket, $workerPath, Array $fastcgiParameters=[])
Definition: Shared.php:295