加入收藏 | 设为首页 | 会员中心 | 我要投稿 辽源站长网 (https://www.0437zz.com/)- 云专线、云连接、智能数据、边缘计算、数据安全!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

php-beanstalkd消息队列类实例分享

发布时间:2021-02-02 22:18:19 所属栏目:PHP教程 来源:网络整理
导读:本文实例为大家分享了php beanstalkd消息队列类的具体代码,供大家参考,具体内容如下 use RuntimeException; /** An interface to the beanstalk queue service. Implements the beanstalk protocol spec 1.9. Where appropriate the documentation from th

/**

  • Pushes an error message to the logger,when one is configured.
  • @param string $message The error message.
  • @return void
    */
    protected function _error($message) {
    if ($this->_config['logger']) {
    $this->_config['logger']->error($message);
    }
    }

public function errors()
{
return $this->_config['logger'];
}
/**

  • Writes a packet to the socket. Prior to writing to the socket will
  • check for availability of the connection.
  • @param string $data
  • @return integer|boolean number of written bytes or false on error.
    */
    protected function _write($data) {
    if (!$this->connected) {
    $message = 'No connecting found while writing data to socket.';
    throw new RuntimeException($message);
    }
$data .= "rn";
return fwrite($this->_connection,$data,strlen($data));

}

/**

  • Reads a packet from the socket. Prior to reading from the socket

  • will check for availability of the connection.

  • @param integer $length Number of bytes to read.

  • @return string|boolean Data or false on error.
    */
    protected function _read($length = null) {
    if (!$this->connected) {
    $message = 'No connection found while reading data from socket.';
    throw new RuntimeException($message);
    }
    if ($length) {
    if (feof($this->_connection)) {
    return false;
    }
    $data = stream_get_contents($this->_connection,$length + 2);
    $meta = stream_get_meta_data($this->_connection);

    if ($meta['timed_out']) {
    $message = 'Connection timed out while reading data from socket.';
    throw new RuntimeException($message);
    }
    $packet = rtrim($data,"rn");
    } else {
    $packet = stream_get_line($this->_connection,16384,"rn");
    }
    return $packet;
    }

/ Producer Commands /

/**

  • The put command is for any process that wants to insert a job into the queue.
  • @param integer $pri Jobs with smaller priority values will be scheduled
  • before jobs with larger priorities. The most urgent priority is
  • 0; the least urgent priority is 4294967295.
  • @param integer $delay Seconds to wait before putting the job in the
  • ready queue. The job will be in the "delayed" state during this time.
  • @param integer $ttr Time to run - Number of seconds to allow a worker to
  • run this job. The minimum ttr is 1.
  • @param string $data The job body.
  • @return integer|boolean false on error otherwise an integer indicating
  • the job id.
    */
    public function put($pri,$delay,$ttr,$data) {
    $this->_write(sprintf("put %d %d %d %drn%s",$pri,strlen($data),$data));
    $status = strtok($this->_read(),' ');
switch ($status) {
  case 'INSERTED':
  case 'BURIED':
    return (integer) strtok(' '); // job id
  case 'EXPECTED_CRLF':
  case 'JOB_TOO_BIG':
  default:
    $this->_error($status);
    return false;
}

}

/**

  • The use command is for producers. Subsequent put commands will put
  • jobs into the tube specified by this command. If no use command has
  • been issued,jobs will be put into the tube named default.
  • @param string $tube A name at most 200 bytes. It specifies the tube to
  • use. If the tube does not exist,it will be created.
  • @return string|boolean false on error otherwise the name of the tube.
    */
    public function useTube($tube) {
    $this->_write(sprintf('use %s',$tube));
    $status = strtok($this->_read(),' ');
switch ($status) {
  case 'USING':
    return strtok(' ');
  default:
    $this->_error($status);
    return false;
}

}

/**

  • Pause a tube delaying any new job in it being reserved for a given time.
  • @param string $tube The name of the tube to pause.
  • @param integer $delay Number of seconds to wait before reserving any more
  • jobs from the queue.
  • @return boolean false on error otherwise true.
    */
    public function pauseTube($tube,$delay) {
    $this->_write(sprintf('pause-tube %s %d',$tube,$delay));
    $status = strtok($this->_read(),' ');
switch ($status) {
  case 'PAUSED':
    return true;
  case 'NOT_FOUND':
  default:
    $this->_error($status);
    return false;
}

}

/ Worker Commands /

(编辑:辽源站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读