diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6a78056 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +composer.lock +vendor/* +.idea/* diff --git a/CronManager/Cron/FieldFactory.php b/CronManager/Cron/FieldFactory.php new file mode 100644 index 0000000..6ba3e08 --- /dev/null +++ b/CronManager/Cron/FieldFactory.php @@ -0,0 +1,64 @@ + + * @link http://en.wikipedia.org/wiki/Cron + */ +class FieldFactory +{ + /** + * @var array Cache of instantiated fields + */ + private $fields = array(); + + /** + * Get an instance of a field object for a cron expression position + * + * @param int $position CRON expression position value to retrieve + * + * @return FieldInterface + * @throws InvalidArgumentException if a position is not valide + */ + public function getField($position) + { + if (!isset($this->fields[$position])) { + switch ($position) { + case 0: + $this->fields[$position] = new SecondsField(); + break; + case 1: + $this->fields[$position] = new \Cron\MinutesField(); + break; + case 2: + $this->fields[$position] = new \Cron\HoursField(); + break; + case 3: + $this->fields[$position] = new \Cron\DayOfMonthField(); + break; + case 4: + $this->fields[$position] = new \Cron\MonthField(); + break; + case 5: + $this->fields[$position] = new \Cron\DayOfWeekField(); + break; + case 6: + $this->fields[$position] = new \Cron\YearField(); + break; + default: + throw new InvalidArgumentException( + $position . ' is not a valid position' + ); + } + } + + return $this->fields[$position]; + } +} \ No newline at end of file diff --git a/CronManager/Cron/SecondsField.php b/CronManager/Cron/SecondsField.php new file mode 100644 index 0000000..571c9c9 --- /dev/null +++ b/CronManager/Cron/SecondsField.php @@ -0,0 +1,46 @@ +isSatisfied($date->format('s'), $value); + } + + /** + * {@inheritdoc} + */ + public function increment(DateTime $date, $invert = false) + { + if ($invert) { + $date->sub(new DateInterval('PT1S')); + } else { + $date->add(new DateInterval('PT1S')); + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function validate($value) + { + return (bool) preg_match('/[\*,\/\-0-9]+/', $value); + } +} \ No newline at end of file diff --git a/CronManager/Download/Adapter/AdapterInterface.php b/CronManager/Download/Adapter/AdapterInterface.php new file mode 100644 index 0000000..9904e7f --- /dev/null +++ b/CronManager/Download/Adapter/AdapterInterface.php @@ -0,0 +1,21 @@ +_binPath.$this->_cmd." -o ".$path." '".$url."'"); + } + + /** + * Set bin path + * + * @param string $path + * @return \CronManager\Download\Adapter\Wget + */ + public function setBinPath($path) + { + $this->_binPath = rtrim($path, '/').'/'; + return $this; + } +} \ No newline at end of file diff --git a/CronManager/Download/Adapter/Wget.php b/CronManager/Download/Adapter/Wget.php new file mode 100644 index 0000000..2f15666 --- /dev/null +++ b/CronManager/Download/Adapter/Wget.php @@ -0,0 +1,85 @@ +_binPath.$this->_cmd; + $log = ""; + if (is_string($uri)) { + $cmd .= ' "'.$uri.'"'; + } elseif (is_array($uri)) { + if (isset($uri['url'])) { + $cmd .= ' "'.$uri['url'].'"'; + } + if (isset($uri['user'])) { + $cmd .= ' --user='.$uri['user']; + } + if (isset($uri['pass'])) { + $cmd .= ' --password='.$uri['pass']; + } + if (isset($uri['log'])) { + $log = "-o ".$uri['log']; + } + } elseif (is_object($uri)) { + if (isset($uri->url)) { + $cmd .= ' "'.$uri->url.'"'; + } + if (isset($uri->user)) { + $cmd .= ' --user='.$uri->user; + } + if (isset($uri->pass)) { + $cmd .= ' --password='.$uri->pass; + } + if (isset($uri->log)) { + $log = "-o ".$uri->log; + } + } + + return shell_exec($cmd.' -P '.$path." ".$log); + } + + /** + * Set bin path + * + * @param string $path + * @return \CronManager\Download\Adapter\Wget + */ + public function setBinPath($path) + { + $this->_binPath = rtrim($path, '/').'/'; + return $this; + } +} \ No newline at end of file diff --git a/CronManager/Download/Download.php b/CronManager/Download/Download.php new file mode 100644 index 0000000..ae9fa03 --- /dev/null +++ b/CronManager/Download/Download.php @@ -0,0 +1,91 @@ +setOptions($options); + } + + /** + * Set options + * + * @param array $options + * @return \CronManager\Download\Download + */ + public function setOptions(array $options) + { + if (array_key_exists('adapter', $options)) { + $this->setAdapter($options['adapter']); + } + } + + /** + * Set download adapter + * + * @param array|string|AdapterInterface $adapter + * @return \CronManager\Download\Download + */ + public function setAdapter($adapter) + { + if ($adapter instanceof AdapterInterface) { + $this->_adapter = $adapter; + return $this; + } + + if (is_string($adapter)) { + switch ($adapter) { + case Download::ADAPTER_CURL: + $this->_adapter = new Curl(); + break; + case Download::ADAPTER_WGET: + $this->_adapter = new Wget(); + break; + default: + throw new \Exception("Download adapter not set!"); + break; + } + } + + return $this; + } + + /** + * Download file + * + * @param string $uri + * @param string $path + * @return boolean + */ + public function download($uri, $path) + { + return $this->_adapter->download($uri, $path); + } +} + diff --git a/CronManager/Download/DownloadInterface.php b/CronManager/Download/DownloadInterface.php new file mode 100644 index 0000000..a03a52a --- /dev/null +++ b/CronManager/Download/DownloadInterface.php @@ -0,0 +1,21 @@ +_path = $path; + $this->_ftp = $ftp; + $this->_name = basename($this->_path); + } + + /** + * Provide read-only access to properties + * + * @param string $name The property to get + * @return mixed + */ + public function __get($name) + { + switch ($name) { + case 'name': + return $this->_name; + case 'path': + return $this->_path; + } + throw new Exception('Unknown property "' . $name . '"'); + } + + /** + * Get the contents of the current directory + * + * @return \CronManager\Ftp\Directory\Iterator + */ + public function getContents() + { + if ($this->_contents === null) { + $this->_changeToDir(); + $this->_contents = new Iterator($this->_path, $this->_ftp); + } + + return $this->_contents; + } + + /** + * Change to the current dir so that operations can be performed relatively + */ + protected function _changeToDir() + { + $chdir = ftp_chdir($this->_ftp->getConnection(), $this->_path); + if ($chdir === false) { + //throw new Exception('Unable to change to directory'); + } + } + + /** + * Whether or not this FTP resource is a file + * + * @return boolean + */ + public function isFile() + { + return false; + } + + /** + * Whether or not this FTP resource is a directory + * + * @return boolean + */ + public function isDirectory() + { + return true; + } + + /** + * Create a directory with optional recursion + * + * @param string|array $path The directory to create + * @param boolean $recursive [optional] Create all directories in the path + * @param string|int $permissions [optional] The permissions to set, can be a string e.g. 'rwxrwxrwx' or octal e.g. 0777 + * @return \CronManager\Ftp\Directory + */ + public function makeDirectory($path, $recursive = false, $permissions = null) + { + if (!is_array($path)) { + $path = explode('/', $path); + } + + $dir = array_shift($path); + + $currentDir = $this->getDirectory($dir); + if (count($path) == 0 || $recursive) { + $currentDir->create($permissions); + } + if (count($path) > 0) { + return $currentDir->makeDirectory($path, $recursive, $permissions); + } + + return $currentDir; + } + + /** + * Create the directory + * + * @return \CronManager\Ftp\Directory + */ + public function create($permissions = null) + { + $makedir = @ftp_mkdir($this->_ftp->getConnection(), $this->_path); + if ($makedir === false) { + //throw new Exception('Unable to create directory "' . $dir . '"'); + } + if ($permissions !== null) { + $chmod = $this->_ftp->chmod($this->_path, $permissions); + if ($chmod === false) { + //throw new Exception('Unable to chmod directory "' . $dir . '"'); + } + } + + return $this; + } + + /** + * Upload a local file to the current directory + * + * @param string $localFilepath The full path and filename to upload + * @param int $mode [optional] The transfer mode + * @param string $remoteFilename [optional] Filename to save to on the server + * @return File + */ + public function put($localFilepath, $mode = null, $remoteFilename = null) + { + if ($remoteFilename == null) { + $remoteFilename = basename($localFilepath); + } + $remoteFilepath = $this->_path . '/' . $remoteFilename; + + $file = new File($remoteFilepath, $this->_ftp); + $file->put($localFilepath, $mode); + + return $file; + } + + /** + * Get a file within the current directory + * + * @param string $filename The file to get + * @return File + */ + public function getFile($filename) + { + return new File($this->_path . '/' . $filename, $this->_ftp); + } + + /** + * Get a folder within the current directory + * + * @param string $filename The directory to get + * @return \CronManager\Ftp\Directory + */ + public function getDirectory($filename) + { + return new Directory($this->_path . '/' . $filename, $this->_ftp); + } + + /** + * Whether or not the directory exists + * + * @return boolean + */ + public function exists() + { + // Unfinished + } + + /** + * Delete the directory + * + * @param boolean $recursive [optional] Recursively delete contents + * @return \CronManager\Ftp\Directory + */ + public function delete($recursive = false) + { + // Unfinished + + return $this; + } + + /** + * Deletes the contents of the directory + * + * @param boolean $recursive [optional] Recursively delete contents + * @return \CronManager\Ftp\Directory + */ + public function deleteContents($recursive = false) + { + // Unfinished + + return $this; + } + + /** + * Rename the directory + * + * @param string $filename The new name + * @return \CronManager\Ftp\Directory + */ + public function rename($filename) + { + // Unfinished + + return $this; + } + + /** + * Copy the directory + * + * @param string $filename The new name + * @return \CronManager\Ftp\Directory + */ + public function copy($filename) + { + // Unfinished + + // Return the new directory + } + + /** + * Move the directory + * + * @param string $filename The new name + * @return \CronManager\Ftp\Directory + */ + public function move($filename) + { + // Unfinished + + return $this; + } + + /** + * Change the directory permissions + * + * @param int|string $permissions The permissions + * @return \CronManager\Ftp\Directory + */ + public function chmod($permissions) + { + // Unfinished + + return $this; + } + + /** + * Save the directory to the given path + * + * @param boolean $recursive [optional] Save the contents recursively + * @return \CronManager\Ftp\Directory + */ + public function saveToPath($recursive = false) + { + // Unfinished + + return $this; + } + + /** + * Save the directory contents to the given path + * + * @param boolean $recursive [optional] Save the contents recursively + * @return \CronManager\Ftp\Directory + */ + public function saveContentsToPath($recursive = false) + { + // Unfinished + + return $this; + } +} \ No newline at end of file diff --git a/CronManager/Ftp/Directory/Iterator.php b/CronManager/Ftp/Directory/Iterator.php new file mode 100644 index 0000000..37e5d52 --- /dev/null +++ b/CronManager/Ftp/Directory/Iterator.php @@ -0,0 +1,254 @@ +_dir = $dir; + $this->_filter = $filter; + $this->_ftp = $ftp; + + $lines = @ftp_rawlist($this->_ftp->getConnection(), $dir); + + foreach ($lines as $line) { + preg_match('/^([\-dl])([rwx\-]+)\s+(\d+)\s+(\w+)\s+(\w+)\s+(\d+)\s+(\w+\s+\d+\s+[\d\:]+)\s+(.*)$/', $line, $matches); + + list($trash, $type, $permissions, $unknown, $owner, $group, $bytes, $date, $name) = $matches; + + if ($type != 'l') { + $this->_data[] = array( + 'type' => $type, + 'permissions' => $permissions, + 'bytes' => $bytes, + 'name' => $name, + ); + } + } + + $this->_count = count($this->_data); + } + + /** + * Rewind the pointer, required by Iterator + * + * @return \CronManager\Ftp\Ftp\Directory_Iterator + */ + public function rewind() + { + $this->_pointer = 0; + + return $this; + } + + /** + * Get the current row, required by iterator + * + * @return \CronManager\Ftp\Ftp\Directory|CronManager\Ftp\Ftp\File|null + */ + public function current() + { + if ($this->valid() === false) { + return null; + } + + if (empty($this->_rows[$this->_pointer])) { + $row = $this->_data[$this->_pointer]; + switch ($row['type']) { + case 'd': // Directory + $this->_rows[$this->_pointer] = new Directory($this->_dir . $row['name'] . '/', $this->_ftp, array('bytes' => $row['bytes'], 'permissions' => $row['permissions'])); + break; + case '-': // File + $this->_rows[$this->_pointer] = new File($this->_dir . $row['name'], $this->_ftp, array('bytes' => $row['bytes'], 'permissions' => $row['permissions'])); + break; + case 'l': // Symlink + default: + } + } + + return $this->_rows[$this->_pointer]; + } + + /** + * Return the key of the current row, required by iterator + * + * @return integer + */ + public function key() + { + return $this->_pointer; + } + + /** + * Continue the pointer to the next row, required by iterator + */ + public function next() + { + ++$this->_pointer; + } + + /** + * Whether or not there is another row, required by iterator + * + * @return boolean + */ + public function valid() + { + return $this->_pointer < $this->_count; + } + + /** + * Return the number of rows, required by countable + * + * @return integer + */ + public function count() + { + return $this->_count; + } + + /** + * Seek to the given position, required by seekable + * + * @param int $position + * @return \CronManager\Ftp\Ftp\Directory_Iterator + */ + public function seek($position) + { + $position = (int)$position; + if ($position < 0 || $position >= $this->_count) { + throw new Exception('Illegal index ' . $position); + } + $this->_pointer = $position; + + return $this; + } + + /** + * Whether or not the offset exists, required by seekable + * + * @param int $offset + * @return boolean + */ + public function offsetExists($offset) + { + return isset($this->_data[(int)$offset]); + } + + /** + * Get the item at the given offset, required by seekable + * + * @param int $offset + * @return \CronManager\Ftp\Ftp\Directory|CronManager\Ftp\Ftp\File|null + */ + public function offsetGet($offset) + { + $this->_pointer = (int)$offset; + + return $this->current(); + } + + /** + * Set the item at the given offset (ignored), required by seekable + * + * @param int $offset + * @param mixed $value + */ + public function offsetSet($offset, $value) + { + } + + /** + * Unset the item at the given offset (ignored), required by seekable + * + * @param int $offset + */ + public function offsetUnset($offset) + { + } + + /** + * Get a given row, required by seekable + * + * @param int $position + * @param boolean $seek [optional] + * @return \CronManager\Ftp\Ftp\Directory|CronManager\Ftp\Ftp\File|null + */ + public function getRow($position, $seek = false) + { + $key = $this->key(); + try { + $this->seek($position); + $row = $this->current(); + } catch (Exception $e) { + throw new Exception('No row could be found at position ' . (int)$position); + } + if ($seek == false) { + $this->seek($key); + } + + return $row; + } +} \ No newline at end of file diff --git a/CronManager/Ftp/Exception.php b/CronManager/Ftp/Exception.php new file mode 100644 index 0000000..92324c3 --- /dev/null +++ b/CronManager/Ftp/Exception.php @@ -0,0 +1,13 @@ +_path = $path; + $this->_ftp = $ftp; + $this->_name = basename($this->path); + $this->_options = $options; + } + + /** + * Provide read-only access to properties + * + * @param string $name The property to get + * @return mixed + */ + public function __get($name) + { + switch ($name) { + case 'name': + return $this->_name; + case 'path': + return $this->_path; + case 'size': + $size = ftp_size($this->_ftp->getConnection(), $this->_name); + return ($size != -1) ? $size : false; + } + throw new Exception('Unknown property "' . $name . '"'); + } + + /** + * Whether or not this FTP resource is a file + * + * @return boolean + */ + public function isFile() + { + return true; + } + + /** + * Whether or not this FTP resource is a directory + * + * @return boolean + */ + public function isDirectory() + { + return false; + } + + /** + * Set the transfer mode for this file, overrides the FTP connection default + * + * @param int $mode [optional] The transfer mode + * @return \CronManager\Ftp\File + */ + public function setMode($mode = null) + { + $this->_mode = $mode; + + return $this; + } + + /** + * Save to a local path using the remote file name + * + * @param string $path The full path to save to + * @param int $mode [optional] The transfer mode + * @param int $offset [optional] The offset to start from for resuming + * @return \CronManager\Ftp\File + */ + public function saveToPath($path, $mode = null, $offset = 0) + { + if (substr($path, -1) != '/') { + $path = $path . '/'; + } + $this->saveToFile($path . basename($this->_name), $mode, $offset); + + return $this; + } + + /** + * Save to a local file + * + * @param string $file The full path to the local file + * @param int $mode [optional] The transfer mode + * @param int $offset [optional] The offset to start from for resuming + * @return \CronManager\Ftp\File + */ + public function saveToFile($file, $mode = null, $offset = 0) + { + if ($mode === null) { + $mode = ($this->_mode === null ? $this->_ftp->determineMode($this->_path) : $this->_mode); + } + + $get = @ftp_get($this->_ftp->getConnection(), $file, $this->_path, $mode, $offset); + if ($get === false) { + //throw new Exception('Unable to save file "' . $this->path . '"') + } + + return $this; + } + + /** + * Upload a local file + * + * @param string $localFilepath The full path to the local file + * @param int $mode [optional] The transfer mode + * @param int $startPos [optional] The offset to start from for resuming + * @return \CronManager\Ftp\File + */ + public function put($localFilepath, $mode = null, $startPos = 0) + { + if ($mode === null) { + $mode = ($this->_mode === null ? $this->_ftp->determineMode($localFilepath) : $this->_mode); + } + $put = @ftp_put($this->_ftp->getConnection(), $this->_path, $localFilepath, $mode, $startPos); + if ($put === false) { + //throw new Exception('Unable to put file "' . $this->path . '"') + } + + return $this; + } + + /** + * Change the file permissions + * + * @param int|string $mode + * @return \CronManager\Ftp\File + */ + public function chmod($mode) + { + $this->_ftp->chmod($this->_path, $mode); + + return $this; + } + + /** + * Rename the file + * + * @param string $filename The new filename + * @return \CronManager\Ftp\File + */ + public function rename($filename) + { + // ftp_rename + + return $this; + } + + /** + * Copy the file to another filename or location + * + * @param string $filename + * @return \CronManager\Ftp\File + */ + public function copy($filename) + { + // copy + } + + /** + * Move the file to another location + * + * @param string $path + * @return \CronManager\Ftp\File + */ + public function move($path) + { + // move + + return $this; + } + + /** + * Delete the file + * + * @return \CronManager\Ftp\File + */ + public function delete() + { + // delete + + return $this; + } + + /** + * Whether or not the file exists + * + * @return boolean + */ + public function exists() + { + // Unfinished + } +} \ No newline at end of file diff --git a/CronManager/Ftp/Ftp.php b/CronManager/Ftp/Ftp.php new file mode 100644 index 0000000..5fa910e --- /dev/null +++ b/CronManager/Ftp/Ftp.php @@ -0,0 +1,437 @@ +_host = $host; + $this->_username = $username; + $this->_password = $password; + $this->_port = $port; + $this->_timeout = $timeout; + } + + /** + * Connect to the FTP server and login + */ + protected function _connect() + { + if ($this->_connection === null) { + if ($this->_ssl) { + $connection = @ftp_ssl_connect($this->_host, $this->_port, $this->_timeout); + } else { + $connection = @ftp_connect($this->_host, $this->_port, $this->_timeout); + } + if ($connection === false) { + throw new Exception('Unable to connect to host "' . $this->_host . '" on port ' . $this->_port); + } + + $this->_connection = $connection; + + $login = @ftp_login($this->_connection, $this->_username, $this->_password); + if ($login === false) { + throw new Exception('Unable to login with username "' . $this->_username); + } + + if ($this->_passive) { + $this->_setPassive(); + } + + $path = @ftp_pwd($this->_connection); + if ($path === false) { + throw new Exception('Unable to get current directory'); + } + + $this->_currentPath = $path; + } + } + + /** + * Whether or not it's connected to the server + * + * @return boolean + */ + public function isConnected() + { + return $this->_connection === null; + } + + /** + * Get the FTP connection + * + * @return resource + */ + public function getConnection() + { + $this->_connect(); + + return $this->_connection; + } + + /** + * Get a directory given an absolute pathname + * + * @param string $filename The directory to get + * @return Directory + */ + public function getDirectory($filename = '') + { + if (empty($filename)) { + return $this->getCurrentDirectory(); + } + + $this->_connect(); + + return new Directory($filename, $this); + } + + /** + * Get a file given an absolute pathname + * + * @param string $filename The file to get + * @return File + */ + public function getFile($filename) + { + $this->_connect(); + + return new File($filename, $this); + } + + /** + * Set the command timeout period in seconds + * + * @param int $timeout The timeout period + * @return Ftp + */ + public function setTimeout($timeout) + { + $this->_timeout = $timeout; + if ($this->_connection !== null) { + $option = @ftp_set_option($this->_connection, FTP_TIMEOUT_SEC, $this->_timeout); + if ($option === false) { + throw new Exception('Unable to set timeout'); + } + } + + return $this; + } + + /** + * Set whether or not to use an SSL connection + * + * @param boolean $ssl [optional] + * @return Ftp + */ + public function setSecure($ssl = true) + { + $this->_ssl = $ssl; + + return $this; + } + + /** + * Turn passive mode on or off + * + * @param boolean $passive [optional] Whether or not to use passive mode + * @return Ftp + */ + public function setPassive($passive = true) + { + $this->_passive = $passive; + $this->_setPassive(); + + return $this; + } + + /** + * Send the PASV command + * + * @return Ftp + */ + protected function _setPassive() + { + if ($this->_connection !== null) { + $pasv = @ftp_pasv($this->_connection, $this->_passive); + if ($pasv === false) { + throw new Exception('Unable to set passive mode'); + } + } + + return $this; + } + + /** + * Set the default transfer mode + * + * @param int $mode The transfer mode + * @return Ftp + */ + public function setMode($mode) + { + switch ($mode) { + case self::MODE_ASCII: + case self::MODE_BINARY: + case self::MODE_AUTO: + $this->_currentMode = $mode; + break; + default: + throw new Exception('Unknown FTP transfer mode'); + } + + return $this; + } + + /** + * Get the current Directory + * + * @return Directory + */ + public function getCurrentDirectory() + { + if ($this->_currentDirectory === null) { + $this->_connect(); + + $this->_currentDirectory = new Directory($this->_currentPath, $this); + } + + return $this->_currentDirectory; + } + + /** + * Determine the transfer mode for the given filename + * + * @param string $filename + * @return int + */ + public function determineMode($filename) + { + if ($this->_currentMode == self::MODE_AUTO) { + $extension = pathinfo($filename, PATHINFO_EXTENSION); + if (in_array($extension, $this->_asciiTypes)) { + return self::MODE_ASCII; + } + return self::MODE_BINARY; + } + return $this->_currentMode; + } + + /** + * Set the ASCII file types for automatic transfer mode + * + * @param array $types + * @return Ftp + */ + public function setAsciiTypes($types) + { + $this->_asciiTypes = array_unique($types); + + return $this; + } + + /** + * Add an ASCII file type for automatic transfer mode + * + * @param string $type + * @return Ftp + */ + public function addAsciiType($type) + { + $types = $this->_asciiTypes; + $types[] = $type; + $this->setAsciiTypes($types); + + return $this; + } + + /** + * Disconnect if connected + */ + public function __destruct() + { + if ($this->_connection !== null) { + @ftp_close($this->_connection); + } + } + + /** + * Change the permissions of a file or directory + * + * @param string $path The file or directory + * @param int|string $permissions The permissions as an octal e.g. 0777 or string e.g. 'rwxrwxrwx' + * @return + */ + public function chmod($path, $permissions) + { + $chmod = @ftp_chmod($this->_connection, $this->_parsePermissions($permissions), $path); + if ($chmod === false) { + // For some reason ftp_chmod will return false even if it's successful so we need to check manually + //throw new Exception('Unable to change permissions of "' . $path . '"'); + } + + return $this; + } + + /** + * Converts string permissions into octal format + * + * @param int|string $permissions The permissions + * @return integer + */ + protected function _parsePermissions($permissions) + { + if (!is_int($permissions) && 0 == preg_match('/^[rwx\-]{9}$/', $permissions)) { + throw new Exception('Invalid permissions format'); + } + $perms = array( + '-' => 0, + 'r' => 1, + 'w' => 2, + 'x' => 4, + ); + if (is_string($permissions)) { + $parts = str_split($permissions, 1); + $owner = $perms[$parts[0]] + $perms[$parts[1]] + $perms[$parts[2]]; + $group = $perms[$parts[3]] + $perms[$parts[4]] + $perms[$parts[5]]; + $world = $perms[$parts[6]] + $perms[$parts[7]] + $perms[$parts[8]]; + $permString = '0' . $owner . $group . $world; + eval('$permissions = ' . $permString . ';'); + } + return $permissions; + } + + /** + * Utility method to create an instance for chaining + * + * @param string $host The FTP host + * @param string $username The login username + * @param string $password The login password + * @param int $port [optional] The port to connect to + * @param int $timeout [optional] The command timeout + * @return Ftp + */ + public static function connect($host, $username, $password, $port = 21, $timeout = 90) + { + return new self($host, $username, $password, $port, $timeout); + } +} \ No newline at end of file diff --git a/CronManager/Http/Client/Curl.php b/CronManager/Http/Client/Curl.php new file mode 100644 index 0000000..cc73541 --- /dev/null +++ b/CronManager/Http/Client/Curl.php @@ -0,0 +1,150 @@ + true, // return web page + CURLOPT_USERPWD => "$user:$pass", + CURLOPT_HEADER => false, // don't return headers + CURLOPT_FOLLOWLOCATION => true, // follow redirects + CURLOPT_ENCODING => "", // handle all encodings +// CURLOPT_USERAGENT => "tieste", // who am i + CURLOPT_AUTOREFERER => true, // set referer on redirect + CURLOPT_CONNECTTIMEOUT => 120, // timeout on connect + CURLOPT_TIMEOUT => 120, // timeout on response + CURLOPT_MAXREDIRS => 10, // stop after 10 redirects + ); + + $ch = curl_init($url); + curl_setopt_array($ch, $options); + $content = curl_exec($ch); + $err = curl_errno($ch); + $errmsg = curl_error($ch); + $header = curl_getinfo($ch); + curl_close($ch); + + $header['errno'] = $err; + $header['errmsg'] = $errmsg; + $header['content'] = $content; + + return $header; + } + + public static function downloadFile($url, $path, $user='', $pass='', $progressCallback=null) + { + $fp = fopen($path, 'w+'); + + $options = array( +// CURLOPT_RETURNTRANSFER => true, + CURLOPT_FOLLOWLOCATION => true, + CURLOPT_FILE => $fp, + CURLOPT_USERPWD => "$user:$pass", + CURLOPT_BUFFERSIZE => 32768 + ); + + if ($progressCallback) { + $options[CURLOPT_NOPROGRESS] = false; + $options[CURLOPT_PROGRESSFUNCTION] = $progressCallback; + } + + $ch = curl_init($url); + curl_setopt_array($ch, $options); + + $result = curl_exec($ch); + curl_close($ch); + fclose($fp); + + return $result; + } + + public static function getFileTimestamp($path, $user='', $pass='') + { + $options = array( + CURLOPT_NOBODY => true, + CURLOPT_RETURNTRANSFER => true, + CURLOPT_HEADER => true, + CURLOPT_USERPWD => "$user:$pass" + ); + + $ch = curl_init($path); + curl_setopt_array($ch, $options); + + $data = curl_exec($ch); + curl_close($ch); + + if (preg_match('/Last-Modified: ([0-9a-zA-Z:, ]+)/', $data, $matches)) + return $matches['0']; + + return null; + } + + public static function getFilesize($path, $user='', $pass='') + { + $options = array( + CURLOPT_RETURNTRANSFER => TRUE, + + CURLOPT_NOBODY => true, + CURLOPT_RETURNTRANSFER => true, + CURLOPT_HEADER => true, + CURLOPT_USERPWD => "$user:$pass" + ); + + $ch = curl_init($path); + curl_setopt_array($ch, $options); + + $data = curl_exec($ch); + $size = curl_getinfo($ch, CURLINFO_CONTENT_LENGTH_DOWNLOAD); + + if (!$size || $size == -1) { + $http_code = curl_getinfo($ch, CURLINFO_HTTP_CODE); + if ($http_code == 301 || $http_code == 302) { + list($header) = explode("\r\n\r\n", $data, 2); + $matches = []; + preg_match('/Location:(.*?)\n/', $header, $matches); + $url = @parse_url(trim(array_pop($matches))); + if (!$url) { + //couldn't process the url to redirect to + $curl_loops = 0; + return null; + } + $last_url = parse_url(curl_getinfo($ch, CURLINFO_EFFECTIVE_URL)); + if (!isset($url['scheme'])) { + $url['scheme'] = $last_url['scheme']; + } + if (!isset($url['host'])) { + $url['host'] = $last_url['host']; + } + if (!isset($url['path'])) { + $url['path'] = $last_url['path']; + } + $new_url = $url['scheme'] . '://' . $url['host'] . $url['path'] . (isset($url['query']) ? '?'.$url['query']:''); + curl_setopt($ch, CURLOPT_URL, $new_url); + $data = curl_exec($ch); + $size = curl_getinfo($ch, CURLINFO_CONTENT_LENGTH_DOWNLOAD); + } + } + curl_close($ch); + /*if (preg_match('/Content-Length: (\d+)/', $data, $matches)) { + return (int)$matches[1]; + }*/ + + return ($size == -1) ? null : $size; + } + + public static function clearStreams() + { + foreach (self::$handles as $handle) curl_close ($handle); + } +} \ No newline at end of file diff --git a/CronManager/Manager/AbstractManager.php b/CronManager/Manager/AbstractManager.php new file mode 100644 index 0000000..2e96c57 --- /dev/null +++ b/CronManager/Manager/AbstractManager.php @@ -0,0 +1,333 @@ +_pool as $pid => $job) { + if (!$job->isRunning()) { + $this->_message = "Stopping job ".$this->_pool[$pid]->name()." ($pid)" . PHP_EOL; + $this->notify(); + $this->stopJob($pid); + } else { + $running_jobs++; + } + } + + return $running_jobs; + } + + /** + * Return free pool index + * + * @return integer + */ + protected function _existsFreePool() + { + return (count($this->_pool) < $this->_MAX_POOL) ? true : false; + } + + /** + * Run new job + * + * @param string $cmd + * @param string $name + * @param array $observers + * + * @return boolean|integer + */ + public function startJob($cmd, $name = 'job', array $observers = array()) + { + // broadcast existing jobs + $this->checkJobs(); + + /**$free_pool_slots = $this->_MAX_POOL - count($this->_pool); + + if ($free_pool_slots <= 0) { + // output error "no free slots in the pool" + return false; + } + + if (!$this->_existsFreePool()) { + return false; + }*/ + + $job = new Job($cmd, $name); + $job->execute(); + $pid = $job->pid(); + $this->_message = "Starting job $name ($pid) ".date("H:i:s").PHP_EOL; + $this->notify(); + + $this->_pool[$pid] = $job; + $this->_streams[$pid] = $this->_pool[$pid]->getPipe(); + $this->_stderr[$pid] = $this->_pool[$pid]->getStderr(); + + foreach ($observers as $params) { + $observer = $this->_pool[$pid]->observerFactory($params); + $this->_pool[$pid]->addObserver($observer); + } + $job->notify(); + + return $pid; + } + + /** + * Destroy(kill) job(daemon) + * + * @param integer $pid + * @return boolean + */ + public function stopJob($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + unset($this->_streams[$pid]); + unset($this->_stderr[$pid]); + unset($this->_pool[$pid]); + + return true; + } + + /** + * Get job + * + * @param integer $pid + * @return \CronManager\Manager\Job|boolean + */ + public function getJob($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]; + } + + /** + * Retrun job name + * + * @param integer $pid + * @return boolean|string + */ + public function name($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->name(); + } + + public function pipeline($pid, $nohup = FALSE) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->pipeline($nohup); + } + + public function stderr($pid, $nohup = FALSE) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->stderr($nohup); + } + + protected function broadcastMessage($msg) + { + // sends selected signal to all child processes + foreach ($this->_pool as $pid => $job) { + $job->message($msg); + } + } + + protected function broadcastSignal($sig) + { + // sends selected signal to all child processes + foreach ($this->_pool as $pid => $job) { + $job->signal($sig); + } + } + + /** + * Dispatcher + * + * @param array $params + * @return void + */ + protected function _dispatch(array $params) + { + switch ($params['type']) { + case "exit": + $this->broadcastSignal(SIGTERM); + $this->_is_terminated = TRUE; + break; + case 'kill': + $this->stopJob($params['pid']); + break; + case "start": + $jobCmd = implode(" ", $params['args']); + $observers = isset($params['observers']) ? $params['observers'] : array(); + $this->startJob($jobCmd, $params['name'], $observers); + break; + default: + echo "Dispatch error: "; + var_dump($params); + break; + } + } + + /** + * Show process status by pid + * + * @param integer $pid + * @return array + */ + public function show($pid = null) + { + if ($this->checkJobs() == 0) { + $this->_message = "No running jobs\n"; + $this->notify(); + } + + foreach ($this->_pool as $pid => $job) { + var_dump($job->getStatus2()); + } + } + + /** + * Read stdin by all runnning jobs with timoute + * + * @param string $write + * @param string $except + * @return void + */ + protected function _stdRead(&$write, &$except) + { + $read = $this->_streams; + $except = $this->_stderr; + + if (!(is_array($read) && count($read) > 0)) { + return; + } + if (false === ($num_changed_streams = stream_select($read, $write, $except, $this->_streamTimeoutSecond, $this->_streamTimeoutMicrosecond))) { + $this->_message = "Some stream error\n"; + $this->notify(); + return; + } + if ($num_changed_streams <= 0) { + return; + } + if (!(is_array($read) && count($read) > 0)) { + return; + } + $cmp_array = $this->_streams; + foreach ($read as $resource) { + $pid = array_search($resource, $cmp_array, TRUE); + if ($pid === FALSE) { + continue; + } + // читаем сообщения процессов + $pool_content = $this->pipeline($pid, TRUE); + $this->_processMessage($pid, $pool_content, 1); + $pool_error = $this->stderr($pid, TRUE); + $this->_processMessage($pid, $pool_error, 2); + continue; + + $job_name = $this->name($pid); + + if ($pool_content) { + echo $pool_content; + } + + if ($pool_error) { + $this->_message = $job_name ." ($pool_index)" . ' [STDERR]: ' . $pool_error.PHP_EOL; + $this->notify(); + } + } + } + + /** + * Process job message + * + * @param integer $pid + * @param string $message + * @param integer $type + * @return void + */ + abstract protected function _processMessage($pid, $message, $type); + + /** + * Run manager + * + * @return void + */ + abstract public function process(); + + /** + * Destructor + */ + public function __destruct() + { + // destroy pool + foreach (array_keys($this->_pool) as $pid) { + $this->stopJob($pid); + } + } + +} \ No newline at end of file diff --git a/CronManager/Manager/Executable.php b/CronManager/Manager/Executable.php new file mode 100644 index 0000000..1e7ff81 --- /dev/null +++ b/CronManager/Manager/Executable.php @@ -0,0 +1,199 @@ +setDi($dependencyInjector); + $this->_init(); + + $executable = $this; + $handler = function ($signo) use ($executable) { + $executable->signalHandler($signo); + }; + pcntl_signal(SIGTERM, $handler); + pcntl_signal(SIGINT, $handler); + pcntl_signal(SIGUSR1, $handler); + pcntl_signal(SIGUSR2, $handler); + + stream_set_blocking(STDIN, 0); + stream_set_blocking(STDOUT, 0); + stream_set_blocking(STDERR, 0); + } + + /** + * Init function + * + * @return void + */ + protected function _init() + { + } + + /** + * Process user signal + * + * @return void + */ + public function signalHandler($signo) + { + switch ($signo) { + case SIGTERM: + case SIGHUP: + case SIGINT: + $this->_is_terminated = TRUE; + echo "exiting in ".get_class($this)."...\n"; + break; + case SIGUSR1: + $this->checkStdin(); + break; + case SIGUSR2: + $this->_is_terminated = TRUE; + echo "[SHUTDOWN] in " . get_class($this) . PHP_EOL; + flush(); + exit(1); + break; + default: + // handle all other signals + break; + } + } + + /** + * Get user content and dispatch it + * + * @return void + */ + protected function checkStdin() + { + $read = array(STDIN); + $write = NULL; + $except = NULL; + + if (is_array($read) && count($read) > 0) { + if (false === ($num_changed_streams = stream_select($read, $write, $except, 1))) { + // oops + } elseif ($num_changed_streams > 0) { + if (is_array($read) && count($read) > 0) { + // stdin + $content = ''; + while ($cmd = fgets(STDIN)) { + if (!$cmd) { + break; + } + $content .= $cmd; + } + $this->dispatch($content); + //echo "recieved $content"; + //echo "stdin> " . $cmd; + } + } + } + //usleep(200000); + } + + /** + * Calls signal handlers for pending signals + * + * @return boolean + */ + protected function isTerminated() + { + pcntl_signal_dispatch(); + if ($this->_is_terminated) { + $this->cleanup(); + } + + return $this->_is_terminated; + } + + + /** + * Final handler, if user stop process exec cleanup function + * + * @return void + */ + private function cleanup() + { + if (is_callable($this->_cleanup_function)) { + call_user_func($this->_cleanup_function); + } + } + + /** + * Register cleanup handler + */ + protected function registerCleanup($callable) + { + if (is_callable($callable)) { + $this->_cleanup_function = $callable; + } else { + trigger_error("$callable is not callable func", E_USER_WARNING); + } + } + + /** + * Command dispatcher + * + * @return void + */ + protected function dispatch($cmd) + { + if (null !== ($params = json_decode($cmd, true))) { + $this->_execute($params); + } + } + + /** + * Execute user command + * + * @return void + */ + protected function _execute(array $params) + { + } + + /** + * Destructor + */ + public function __destruct() + { + //echo "destructor called in " . get_class($this) . PHP_EOL; + if (!$this->_is_terminated) { + $this->_is_terminated = TRUE; + $this->isTerminated(); + } + } + +} diff --git a/CronManager/Manager/Job.php b/CronManager/Manager/Job.php new file mode 100644 index 0000000..8524990 --- /dev/null +++ b/CronManager/Manager/Job.php @@ -0,0 +1,350 @@ +_cmd = $cmd; + $this->_name = $name; + } + + /** + * Destructor + */ + public function __destruct() + { + // wait process finish + if ($this->_resource) { + if ($this->_waitpid && $this->isRunning()) { + $this->_message = "Waiting for job to complete"; + $this->notify(); + $status = NULL; + pcntl_waitpid($this->_pid, $status, WNOHANG); + + /*while ($this->isRunning()) { + echo '.'; + sleep(1); + }*/ + } + } + + // close descriptor + if (isset($this->_pipes) && is_array($this->_pipes)) { + foreach (array_keys($this->_pipes) as $index ) { + if (is_resource($this->_pipes[$index])) { + fflush($this->_pipes[$index]); + fclose($this->_pipes[$index]); + unset($this->_pipes[$index]); + } + } + } + + // close opened handler + if ($this->_resource) { + proc_close($this->_resource); + unset($this->_resource); + } + $this->_message = "Stop"; + $this->notify(); + } + + /** + * Return process pid + * + * @return integer + */ + public function pid() + { + return $this->_pid; + } + + /** + * Return job name + * + * @return string + */ + public function name() + { + return $this->_name; + } + + /** + * Read process messages + * + * @return string + */ + private function readPipe($index, $nohup = FALSE) + { + if (!isset($this->_pipes[$index])) { + return FALSE; + } + + if (!is_resource($this->_pipes[$index]) || feof($this->_pipes[$index])) { + return FALSE; + } + if ($nohup) { + $data = ''; + while ($line = fgets($this->_pipes[$index])) { + $data .= $line; + } + $this->_message = $data; + $this->notify($index); + + return $data; + } + + while ($data = fgets($this->_pipes[$index])) { + $this->_message = $data; + $this->notify($index); + } + } + + /** + * Read process messages + * + * @return string + */ + public function pipeline($nohup = FALSE) + { + return $this->readPipe(1, $nohup); + } + + /** + * Read process errors + * + * @return string + */ + public function stderr($nohup = FALSE) + { + return $this->readPipe(2, $nohup); + } + + /** + * Run process and return his pid + * + * @return integer + */ + public function execute() + { + // определяем откуда будет читать и куда писать процесс + $descriptorspec = array( + 0 => array('pipe', 'r'), // stdin + 1 => array('pipe', 'w'), // stdout + 2 => array('pipe', 'w') // stderr + ); + + $this->_resource = proc_open('exec '.$this->_cmd, $descriptorspec, $this->_pipes); + + // ставим неблокирующий режим всем дескрипторам + stream_set_blocking($this->_pipes[0], 0); + stream_set_blocking($this->_pipes[1], 0); + stream_set_blocking($this->_pipes[2], 0); + + if (!is_resource($this->_resource)) { + return FALSE; + } + + $proc_status = proc_get_status($this->_resource); + $this->_pid = isset($proc_status['pid']) ? $proc_status['pid'] : 0; + + $this->_message = "Start"; + $this->notify(); + + return $this->_pid; + } + + /** + * Terminate process + * + * @return boolean + */ + public function terminate() + { + // close descriptor + if (isset($this->_pipes) && is_array($this->_pipes)) { + foreach (array_keys($this->_pipes) as $index ) { + if (is_resource($this->_pipes[$index])) { + fflush($this->_pipes[$index]); + fclose($this->_pipes[$index]); + unset($this->_pipes[$index]); + } + } + } + + // close opened handler + if ($this->_resource) { + proc_terminate($this->_resource); + unset($this->_resource); + } + $this->_message = "Process terminated"; + $this->notify(); + + return true; + } + + /** + * Terminate process + * + * @return boolean + */ + public function kill() + { + // close descriptor + if (isset($this->_pipes) && is_array($this->_pipes)) { + foreach (array_keys($this->_pipes) as $index ) { + if (is_resource($this->_pipes[$index])) { + fflush($this->_pipes[$index]); + fclose($this->_pipes[$index]); + unset($this->_pipes[$index]); + } + } + } + + // close opened handler + if ($this->_resource) { + posix_kill($this->_pid, 9); + unset($this->_resource); + } + $this->_message = "Process killed"; + $this->notify(); + + return true; + } + + public function getPipe() + { + return $this->_pipes[1]; + } + + public function getStderr() + { + return $this->_pipes[2]; + } + + /** + * Check if process runninig or not + * + * @return boolean + */ + public function isRunning() + { + if (!is_resource($this->_resource)) { + return FALSE; + } + $proc_status = proc_get_status($this->_resource); + + return isset($proc_status['running']) && $proc_status['running']; + } + + /** + * Return job status + * + * @return array + */ + public function getStatus() + { + if (!is_resource($this->_resource)) { + return FALSE; + } + + return proc_get_status($this->_resource); + } + + /** + * Return job linux status + * + * @return array + */ + public function getStatus2() + { + //exec("ps -p $this->_pid -o pid,vsz=MEMORY -o user,group=GROUP -o comm,args=ARGS, -o stime,etime=RUNNING", $str); + exec("ps -p $this->_pid -o pid,vsz=MEMORY -o user,group=GROUP -o stime,etime=TIME", $str); + + $args = preg_split('/ +/', $str[0]); + $results = preg_split('/ +/', $str[1]); + if (empty($results[0])) { + unset($results[0]); + } + $args = ['pid', 'memory', 'user', 'group', 'stime', 'time']; + + return array_combine($args, $results); + } + + // посылка сигнала процессу + public function signal($sig) + { + if (!$this->isRunning()) { + return FALSE; + } + posix_kill($this->_pid, $sig); + } + + // отправка сообщения в STDIN процесса + public function message($msg) + { + if (!$this->isRunning()) { + return FALSE; + } + fwrite($this->_pipes[0], $msg); + } + + /** + * Observer factory method + * + * @param array $params + * @return \CronManager\Traits\Observer + */ + public function observerFactory(array $params) + { + switch ($params['observer']) { + case 'mysql': + $options = (array_key_exists('options', $params)) ? $params['options'] : array(); + return new Observer\Mysql($params['hash'], $this->_cmd, $this->_pid, $params['job_id'], $this->_name, $options); + case 'file': + $options = (array_key_exists('options', $params)) ? $params['options'] : array(); + return new Observer\File($params['hash'], $this->_cmd, $this->_pid, $params['job_id'], $this->_name, $options); + } + } +} \ No newline at end of file diff --git a/CronManager/Manager/Job/Observer/File.php b/CronManager/Manager/Job/Observer/File.php new file mode 100644 index 0000000..7a54cc7 --- /dev/null +++ b/CronManager/Manager/Job/Observer/File.php @@ -0,0 +1,95 @@ +_openLogFile(); + } + + /** + * Set options + * + * @param array $options + * @return \CronManager\Manager\Job\Observer\File + */ + public function setOptions(array $options) + { + parent::setOptions($options); + + if (array_key_exists('logPath', $options)) { + $this->_path = rtrim($options['logPath'], "/\\")."/"; + } else { + $this->_path = sys_get_temp_dir()."/"; + } + + return $this; + } + + /** + * Init log file + * + * @return void + */ + private function _openLogFile() + { + $this->_logFile = fopen($this->_path.$this->_action.$this->_jobId.".log", "a+"); + } + + public function onEvent($subject, $messageType) + { + $this->_update(); + $message = $subject->getMessage(); + if ($message === false || $message === null || $message === '') { + return; + } + $str = "------------------------------------".PHP_EOL; + $str .= "Process id: ".$this->_processId.", date: ".date("Y-m-d H:i:s")."\n"; + switch ($messageType) { + case 2: + $str .= " Message type: error".PHP_EOL; + break; + case 1: + default: + $str .= " Message type: info".PHP_EOL; + break; + } + + $str .= $message.PHP_EOL; + $str .= PHP_EOL; + + fwrite($this->_logFile, $str); + } + + public function __destruct() + { + parent::__destruct(); + fclose($this->_logFile); + } +} \ No newline at end of file diff --git a/CronManager/Manager/Job/Observer/Mysql.php b/CronManager/Manager/Job/Observer/Mysql.php new file mode 100644 index 0000000..1498243 --- /dev/null +++ b/CronManager/Manager/Job/Observer/Mysql.php @@ -0,0 +1,69 @@ +_logModel = $options['logModel']; + + return $this; + } + + public function onEvent($subject, $messageType) + { + $this->_update(); + $message = $subject->getMessage(); + if ($message === false || $message === null || $message === '') { + return; + } + $log = new $this->_logModel; + $log->process_id = $this->_processId; + switch ($messageType) { + case 2: + $log->type = 'error'; + break; + case 1: + default: + $log->type = 'message'; + break; + } + + $log->message = $message; + $log->time = date("Y-m-d H:i:s"); + $log->save(); + if (!$log->save()) { + print_r($log->getMessages()); + } + } + +} \ No newline at end of file diff --git a/CronManager/Manager/Job/Observer/Process.php b/CronManager/Manager/Job/Observer/Process.php new file mode 100644 index 0000000..37b3ace --- /dev/null +++ b/CronManager/Manager/Job/Observer/Process.php @@ -0,0 +1,151 @@ +_hash = $hash; + $this->_cmd = $cmd; + $this->_pid = $pid; + $this->_jobId = $jobId; + $this->_action = $action; + $this->setOptions($options); + $this->_init(); + } + + /** + * Set options + * + * @param array $options + * @return \CronManager\Manager\Job\Observer\Mysql + */ + public function setOptions(array $options) + { + if (isset($options['destructStatus'])) { + $this->_destructStatus = $options['destructStatus']; + } + if (isset($options['parentHash'])) { + $this->_parentHash = $options['parentHash']; + } + if (!isset($options['processModel'])) { + throw new \Exception('Process model not set!'); + } + + $this->_processModel = $options['processModel']; + + return $this; + } + + protected function _init() + { + if (!($process = call_user_func(array($this->_processModel, 'findFirst'), "hash = '".$this->_hash."'"))) { + $process = new $this->_processModel; + $attempt = 1; + } else { + $attempt = $process->attempt + 1; + } + + $process->hash = $this->_hash; + $process->command = $this->_cmd; + $process->job_id = $this->_jobId; + $process->pid = $this->_pid; + $process->action = $this->_action; + $process->stime = date("Y-m-d H:i:s"); + $process->time = 0; + $process->status = 'running'; + $process->phash = $this->_parentHash; + $process->attempt = $attempt; + if ($process->save()) { + $this->_processId = $process->id; + } else { + print_r($process->getMessages()); + } + } + + public function __destruct() + { + $process = call_user_func(array($this->_processModel, 'findFirst'), "id = '".$this->_processId."'"); + if (!$process) { + return false; + } + if ($process->status == 'completed') { + return false; + } + $process->status = $this->_destructStatus; + $process->time = time() - strtotime($process->stime); + $process->update(); + } + + protected function _update() + { + $process = call_user_func(array($this->_processModel, 'findFirst'), "id = '".$this->_processId."'"); + if (!$process) { + return false; + } + $process->time = time() - strtotime($process->stime); + $process->update(); + } + +} \ No newline at end of file diff --git a/CronManager/Manager/Job/Observer/Stdout.php b/CronManager/Manager/Job/Observer/Stdout.php new file mode 100644 index 0000000..04c75eb --- /dev/null +++ b/CronManager/Manager/Job/Observer/Stdout.php @@ -0,0 +1,38 @@ +_processId.", date: ".date("Y-m-d H:i:s").PHP_EOL; + switch ($messageType) { + case 2: + $str .= "Message type: error".PHP_EOL; + break; + case 1: + default: + $str .= "Message type: info".PHP_EOL; + break; + } + + $str .= $subject->getMessage().PHP_EOL; + $str .= PHP_EOL; + + echo $str; + + $this->_update(); + } +} \ No newline at end of file diff --git a/CronManager/Manager/JobManager.php b/CronManager/Manager/JobManager.php new file mode 100644 index 0000000..3bcd70a --- /dev/null +++ b/CronManager/Manager/JobManager.php @@ -0,0 +1,436 @@ +setDi($dependencyInjector); + $this->_init(); + } + + protected function _init() + { + $this->_config = $this->getDi()->get('config'); + $this->_initConfig(); + + $this->_producer = new \Thumper\Producer($this->getDi()->get('thumperConnection')->getConnection()); + $this->_producer->setExchangeOptions(['name' => $this->_config->rabbitmq->managerExchangeName, 'type' => $this->_config->rabbitmq->exchangeType]); + } + + protected function _initConfig() + { + $settings = $this->_config->daemon->settings; + $type = $settings['type']; + switch ($type) { + case 'model': + $environment = $settings['environment']; + $model = $settings['model']; + $params = call_user_func(array($model, 'findOne'), "status='1' AND environment => '".$environment."'"); + break; + case 'array': + default: + $params = $settings['params']; + break; + } + if (!$params || !is_array($params)) { + return; + } + if (isset($params['max_pool'])) { + $this->_MAX_POOL = (int) $params['max_pool']; + } + if (isset($params['max_memory_mb'])) { + $this->_MAX_MEMORY_MB = (int) $params['max_memory_mb']; + } + if (isset($params['max_memory_percentage'])) { + $this->_MAX_MEMORY_PERCENTAGE = (int) $params['max_memory_percentage']; + } + if (isset($params['max_cpu_load'])) { + $this->_MAX_CPU_LOAD = (int) $params['max_cpu_load']; + } + } + + public function __destruct() + { + // destroy pool + foreach (array_keys($this->_pool) as $pid) { + $this->stopJob($pid); + } + } + + /** + * Check running jobs + * + * @return integer + */ + protected function checkJobs() + { + $running_jobs = 0; + foreach ($this->_pool as $pid => $job) { + if (!$job->isRunning()) { + echo "Stopping job ".$this->_pool[$pid]->name()." ($pid)" . PHP_EOL; + $this->stopJob($pid); + } else { + $running_jobs++; + } + } + + return $running_jobs; + } + + /** + * Return free pool index + * + * @return integer + */ + protected function getFreeIndex() + { + return (count($this->_pool) < self::POOL_MAX) ? true : false; + } + + /** + * Run new job + * + * @param string $cmd + * @param string $name + * @param array $observers + * + * @return boolean|integer + */ + public function startJob($cmd, $name = 'job', array $observers = array()) + { + // broadcast existing jobs + $this->checkJobs(); + + $free_pool_slots = self::POOL_MAX - count($this->_pool); + + if ($free_pool_slots <= 0) { + // output error "no free slots in the pool" + return false; + } + + if (!$this->getFreeIndex()) { + return false; + } + + $job = new Job($cmd, $name); + $job->execute(); + $pid = $job->pid(); + echo "Starting job $name ($pid) ". date("H:i:s") . PHP_EOL; + + $this->_pool[$pid] = $job; + $this->_streams[$pid] = $this->_pool[$pid]->getPipe(); + $this->_stderr[$pid] = $this->_pool[$pid]->getStderr(); + + foreach ($observers as $params) { + $observer = $this->_pool[$pid]->observerFactory($params); + $this->_pool[$pid]->addObserver($observer); + } + $job->notify(); + + return $pid; + } + + /** + * Destroy(kill) job(daemon) + * + * @param integer $pid + * @return boolean + */ + public function stopJob($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + unset($this->_streams[$pid]); + unset($this->_stderr[$pid]); + unset($this->_pool[$pid]); + } + + /** + * Get job + * + * @param integer $pid + * @return \CronManager\Manager\Job|boolean + */ + public function getJob($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]; + } + + /** + * Retrun job name + * + * @param integer $pid + * @return boolean|string + */ + public function name($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->name(); + } + + public function pipeline($pid, $nohup = FALSE) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->pipeline($nohup); + } + + public function stderr($pid, $nohup = FALSE) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->stderr($nohup); + } + + protected function broadcastMessage($msg) + { + // sends selected signal to all child processes + foreach ($this->_pool as $pid => $job) { + $job->message($msg); + } + } + + protected function broadcastSignal($sig) + { + // sends selected signal to all child processes + foreach ($this->_pool as $pid => $job) { + $job->signal($sig); + } + } + + // если была зарегистрирована пользовательская функция разбора - используем ее + protected function dispatch($params) + { + if (is_callable($this->_dispatch_function)) { + call_user_func($this->_dispatch_function, $params); + } else { + if (is_array($params)) { + print_r($params); + } else { + echo $params; + } + } + } + + // регистрация пользовательской функции для разбора + public function registerDispatch($callable) + { + if (is_callable($callable)) { + $this->_dispatch_function = $callable; + } else { + trigger_error("$callable is not callable func", E_USER_WARNING); + } + } + + // разбираем пользовательский ввод + protected function dispatchMain(array $params, $pid) + { + $val = (array_key_exists('name', $params)) ? $params['name'] : false; + switch ($params['type']) { + case "exit": + $this->broadcastSignal(SIGTERM); + $this->_is_terminated = TRUE; + break; + case "test": + echo 'sending test' . PHP_EOL; + $this->broadcastMessage('test'); + $this->broadcastSignal(SIGUSR1); + break; + case 'kill': + if (array_key_exists('pid', $params)) { + $kPid = (int) $params['pid']; + } else { + $kPid = ($val !== '' && (int) $val >= 0) ? (int) $val : -1; + } + if ($kPid >= 0) { + $this->stopJob($kPid); + } + break; + case "start": + $jobCmd = implode(" ", $params['args']); + $observers = array_key_exists('observers', $params) ? $params['observers'] : array(); + $this->startJob($jobCmd, $val, $observers); + break; + case "show": + $this->show($val); + break; + case "check": + $result = $this->getFreeIndex(); + if ($pid > 0) { + $this->_pool[$pid]->message(json_encode(['sender' => 'manager', 'type' => 'freePool', 'result' => $result])); + $this->_pool[$pid]->signal(SIGUSR1); + } + break; + default: + $this->dispatch($params); + break; + } + + return FALSE; + } + + /** + * Show process status by pid + * + * @param integer $pid + * @return array + */ + public function show($pid = null) + { + if ($this->checkJobs() == 0) { + echo "No running jobs\n"; + } + + foreach ($this->_pool as $pid => $job) { + var_dump($job->getStatus2()); + } + } + + /** + * Run manager + * + * @return void + */ + public function process() + { + stream_set_blocking(STDIN, 0); + + $write = NULL; + $except = NULL; + + while (!$this->_is_terminated) { + $this->_queuePublish(); + $this->_stdRead($write, $except); + $this->checkJobs(); + usleep(200000); + } + } + + protected function _queuePublish() + { + if (!$this->existsFreePool()) { + return; + } + $this->_producer->publish(1); + } + + protected function _stdRead($write, $except) + { + /* + из-за особенности функции stream_select приходится особым образом работать с массивами дескрипторов + */ + $read = $this->_streams; + $except = $this->_stderr; + $read[0] = STDIN; + + if (!(is_array($read) && count($read) > 0)) { + return; + } + if (false === ($num_changed_streams = stream_select($read, $write, $except, 0, 300000))) { + echo "Some stream error\n"; + return; + } + if ($num_changed_streams <= 0) { + return; + } + // есть что почитать + if (!(is_array($read) && count($read) > 0)) { + return; + } + $cmp_array = $this->_streams; + $cmp_array[0] = STDIN; + foreach ($read as $resource) { + $pid = array_search($resource, $cmp_array, TRUE); + if ($pid === FALSE) { + continue; + } + if ($pid == 0) { + // stdin + $content = ''; + while ($cmd = fgets(STDIN)) { + if (!$cmd) { + break; + } + $content .= $cmd; + } + $content = trim($content); + if ($content) { + // если Process Manager словил на вход какую-то строчку - парсим и решаем что делать + $params = []; + $parts = explode(" ", $content); + $params['type'] = isset($parts[0]) ? array_shift($parts) : false; + $params['name'] = isset($parts[1]) ? array_shift($parts) : false; + $params['args'] = $parts; + + $this->dispatchMain($params, $pid); + } + //echo "stdin> " . $cmd; + } else { + // читаем сообщения процессов + $pool_content = $this->pipeline($pid, TRUE); + $job_name = $this->name($pid); + + if (($params = json_decode($pool_content, true))) { + $this->dispatchMain($params, $pid); + } else { + var_dump($pool_content); + } + + $pool_content = $this->stderr($pid, TRUE); + if ($pool_content) { + echo $job_name ." ($pool_index)" . ' [STDERR]: ' . $pool_content."\n"; + } + } + } + + } + +} \ No newline at end of file diff --git a/CronManager/Manager/QueueManager.php b/CronManager/Manager/QueueManager.php new file mode 100644 index 0000000..f5d84a7 --- /dev/null +++ b/CronManager/Manager/QueueManager.php @@ -0,0 +1,361 @@ +setDi($dependencyInjector); + $this->_init(); + } + + protected function _init() + { + $config = $this->getDi()->get('config'); + $this->_consumer = new \Thumper\Consumer($this->getDi()->get('thumperConnection')->getConnection()); + $this->_consumer->setExchangeOptions(['name' => $config->rabbitmq->exchangeName, 'type' => $config->rabbitmq->exchangeType]); + $this->_consumer->setQueueOptions(['name' => $config->rabbitmq->queueName]); + //$this->_consumer->setRoutingKey(); + $this->_consumer->setCallback($this->_callback()); + } + + protected function _callback() + { + $manager = $this; + $callback = function ($params) use ($manager) { + $params = igbinary_unserialize($params); + $manager->dispatchMessage($params); + }; + + return $callback; + } + + public function __destruct() + { + // destroy pool + foreach (array_keys($this->_pool) as $pid) { + $this->stopJob($pid); + } + } + + /** + * Check running jobs + * + * @return integer + */ + protected function checkJobs() + { + $running_jobs = 0; + foreach ($this->_pool as $pid => $job) { + if (!$job->isRunning()) { + echo "Stopping job ".$this->_pool[$pid]->name()." ($pid)" . PHP_EOL; + $this->stopJob($pid); + } else { + $running_jobs++; + } + } + + return $running_jobs; + } + + /** + * Return free pool index + * + * @return integer + */ + protected function existsFreePool() + { + return (count($this->_pool) < self::POOL_MAX) ? true : false; + } + + /** + * Run new job + * + * @param string $cmd + * @param string $name + * @param array $observers + * + * @return boolean|integer + */ + public function startJob($cmd, $name = 'job', array $observers = array()) + { + // broadcast existing jobs + $this->checkJobs(); + + $free_pool_slots = self::POOL_MAX - count($this->_pool); + + if ($free_pool_slots <= 0) { + // output error "no free slots in the pool" + return false; + } + + if (!$this->existsFreePool()) { + return false; + } + + $job = new Job($cmd, $name); + $job->execute(); + $pid = $job->pid(); + echo "Starting job $name ($pid) ". date("H:i:s") . PHP_EOL; + + $this->_pool[$pid] = $job; + $this->_streams[$pid] = $this->_pool[$pid]->getPipe(); + $this->_stderr[$pid] = $this->_pool[$pid]->getStderr(); + + foreach ($observers as $params) { + $observer = $this->_pool[$pid]->observerFactory($params); + $this->_pool[$pid]->addObserver($observer); + } + $job->notify(); + + return $pid; + } + + /** + * Destroy(kill) job(daemon) + * + * @param integer $pid + * @return boolean + */ + public function stopJob($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + unset($this->_streams[$pid]); + unset($this->_stderr[$pid]); + unset($this->_pool[$pid]); + } + + /** + * Get job + * + * @param integer $pid + * @return \CronManager\Manager\Job|boolean + */ + public function getJob($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]; + } + + /** + * Retrun job name + * + * @param integer $pid + * @return boolean|string + */ + public function name($pid) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->name(); + } + + public function pipeline($pid, $nohup = FALSE) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->pipeline($nohup); + } + + public function stderr($pid, $nohup = FALSE) + { + if (!isset($this->_pool[$pid])) { + return FALSE; + } + + return $this->_pool[$pid]->stderr($nohup); + } + + protected function broadcastMessage($msg) + { + // sends selected signal to all child processes + foreach ($this->_pool as $pid => $job) { + $job->message($msg); + } + } + + protected function broadcastSignal($sig) + { + // sends selected signal to all child processes + foreach ($this->_pool as $pid => $job) { + $job->signal($sig); + } + } + + /** + * + * + * @param array $params + * @return void + */ + public function dispatchMessage(array $params) + { + switch ($params['type']) { + case "exit": + $this->broadcastSignal(SIGTERM); + $this->_is_terminated = TRUE; + break; + case 'kill': + $this->stopJob($params['pid']); + break; + case "start": + $jobCmd = implode(" ", $params['args']); + $observers = isset($params['observers']) ? $params['observers'] : array(); + $this->startJob($jobCmd, $params['name'], $observers); + break; + default: + $this->dispatch($params); + break; + } + } + + /** + * Show process status by pid + * + * @param integer $pid + * @return array + */ + public function show($pid = null) + { + if ($this->checkJobs() == 0) { + echo "No running jobs\n"; + } + + foreach ($this->_pool as $pid => $job) { + var_dump($job->getStatus2()); + } + } + + /** + * Run manager + * + * @return void + */ + public function process() + { + stream_set_blocking(STDIN, 0); + + $write = NULL; + $except = NULL; + + while (!$this->_is_terminated) { + $this->_queueRead(); + $this->_stdRead($write, $except); + $this->checkJobs(); + //usleep(200000); + } + } + + protected function _queueRead() + { + if (!$this->existsFreePool()) { + return; + } + $this->_consumer->consume(1); + } + + protected function _stdRead($write, $except) + { + /* + из-за особенности функции stream_select приходится особым образом работать с массивами дескрипторов + */ + $read = $this->_streams; + $except = $this->_stderr; + $read[0] = STDIN; + + if (!(is_array($read) && count($read) > 0)) { + return; + } + if (false === ($num_changed_streams = stream_select($read, $write, $except, 0, 300000))) { + echo "Some stream error\n"; + return; + } + if ($num_changed_streams <= 0) { + return; + } + // есть что почитать + if (!(is_array($read) && count($read) > 0)) { + return; + } + $cmp_array = $this->_streams; + $cmp_array[0] = STDIN; + foreach ($read as $resource) { + $pid = array_search($resource, $cmp_array, TRUE); + if ($pid === FALSE) { + continue; + } + if ($pid == 0) { + // stdin + $content = ''; + while ($cmd = fgets(STDIN)) { + if (!$cmd) { + break; + } + $content .= $cmd; + } + $content = trim($content); + if ($content) { + // если Process Manager словил на вход какую-то строчку - парсим и решаем что делать + $params = []; + $parts = explode(" ", $content); + $params['type'] = isset($parts[0]) ? array_shift($parts) : false; + $params['name'] = isset($parts[1]) ? array_shift($parts) : false; + $params['args'] = $parts; + + $this->dispatchMain($params, $pid); + } + //echo "stdin> " . $cmd; + } else { + // читаем сообщения процессов + $pool_content = $this->pipeline($pid, TRUE); + $job_name = $this->name($pid); + + if ($pool_content) { + echo $pool_content; + } + + $pool_content = $this->stderr($pid, TRUE); + if ($pool_content) { + echo $job_name ." ($pool_index)" . ' [STDERR]: ' . $pool_content."\n"; + } + } + } + + } + +} \ No newline at end of file diff --git a/CronManager/Queue/Job/Base.php b/CronManager/Queue/Job/Base.php new file mode 100644 index 0000000..ae33ee2 --- /dev/null +++ b/CronManager/Queue/Job/Base.php @@ -0,0 +1,81 @@ +_di->get('db'); + $db->connect(); + $process = \CronManager\Queue\Model\Process::findFirst("hash = '".$hash."'"); + if (!$process) { + $this->_message = "Process by hash '".$hash."' not found, try again!"; + $this->notify(); + sleep(1); + if ($count == 5) { + throw new \Exception("Process by hash '".$hash."' not found after ".$count." attemps!"); + } + return $this->_findByHash($hash, ++$count); + } + } + + return $process; + } + + + /** + * Find process by hash + * + * @param string $hash + * @param string $statэяus + * @return boolean + */ + protected function _updateStatus($hash, $status) + { + $process = $this->_findByHash($hash); + sleep(1); + $process->status = $status; + return $process->save(); + } + + /** + * Return process id by hash + * + * @param $hash + * @return mixed + */ + protected function _getProcessId($hash) + { + $process = $this->_findByHash($hash); + return $process->id; + } +} \ No newline at end of file diff --git a/CronManager/Queue/Job/Consumer.php b/CronManager/Queue/Job/Consumer.php new file mode 100644 index 0000000..6fb5bf3 --- /dev/null +++ b/CronManager/Queue/Job/Consumer.php @@ -0,0 +1,133 @@ +_config = $this->getDi()->get('config'); + $this->_socketFile = $this->_config->daemon->socket; + + $this->_consumerJob = new \Thumper\Consumer($this->_di->get('thumperConnection')->getConnection()); + $this->_consumerJob->setExchangeOptions(array('name' => $this->_config->rabbitmq->jobExchangeName, 'type' => $this->_config->rabbitmq->exchangeType)); + $this->_consumerJob->setQueueOptions(array('name' => 'cron-cli-queue-job')); + //$this->_consumerJob->setRoutingKey(); + $this->_consumerJob->setCallback($this->_costumerHandler()); + } + + /** + * Start consumer process + * + * @return void + */ + public function run() + { + $this->_consumeJob(); + } + + /** + * Wait access to new job in queue by cron manager + * + * @return bool + */ + public function consumeManager() + { + if ($this->isTerminated()) { + unset($this->_consumerJob); + return false; + } + $this->_message = "Wait free pool"; + $this->notify(); + + $consumer = new \Thumper\Consumer($this->_di->get('thumperConnection')->getConnection()); + $consumer->setExchangeOptions(array('name' => $this->_config->rabbitmq->managerExchangeName, 'type' => $this->_config->rabbitmq->exchangeType)); + $consumer->setQueueOptions(array('name' => 'cron-cli-queue-manager')); + $consumer->setCallback($this->_managerHandler()); + $consumer->consume(1); + + return true; + } + + /** + * Wait new job in queue and send command to cron manager + * + * @return void + */ + protected function _consumeJob() + { + $this->_message = "Wait new job task"; + $this->notify(); + $this->_consumerJob->consume(0); + } + + /** + * Handler to send new job to cron manager + * + * @return \Closure + */ + protected function _costumerHandler() + { + $client = $this; + $callback = function ($params) use ($client) { + if (!$client->consumeManager()) { + $client->setMessage("Process terminated"); + $client->notify(); + return false; + } + + $params = igbinary_unserialize($params); + $client->write(json_encode($params)); + + $client->setMessage("Wait new job task"); + $client->notify(); + usleep(200000); + + return true; + }; + + return $callback; + } + + /** + * Handler to process access to new job fron cron manager + * + * @return \Closure + */ + protected function _managerHandler() + { + $callback = function ($params) { + return true; + }; + + return $callback; + } +} \ No newline at end of file diff --git a/CronManager/Queue/Job/Inspector.php b/CronManager/Queue/Job/Inspector.php new file mode 100644 index 0000000..a219512 --- /dev/null +++ b/CronManager/Queue/Job/Inspector.php @@ -0,0 +1,221 @@ +_config = $this->getDi()->get('config'); + $this->_socketFile = $this->_config->daemon->socket; + $this->_stopRunningProcesses(); + while (!$this->isTerminated()) { + $this->_stopProcesses(); + $this->_runProcesses(); + $this->_ttlProcesses(); + $this->_rerunProcesses(); + usleep(500000); + } + } + + /** + * Stop running processes + * + * @return void + */ + protected function _stopRunningProcesses() + { + if (!($processes = $this->_getProcesses('running')) || $processes->count() == 0) { + return; + } + + foreach ($processes as $process) { + $this->_stopProcess($process); + } + } + + + /** + * Stop processes with status stop + * + * @return void + */ + protected function _stopProcesses() + { + if (!($processes = $this->_getProcesses('stop')) || $processes->count() == 0) { + return; + } + + foreach ($processes as $process) { + $this->_stopProcess($process); + } + } + + /** + * Stop processes by ttl + * + * @return void + */ + protected function _ttlProcesses() + { + if (!($processes = $this->_getProcesses('running')) || $processes->count() == 0) { + return; + } + + foreach ($processes as $process) { + $ttl = Job::findFirst("id = '".$process->job_id."'")->ttl; + if ($ttl > 0 && ($ttl < (time() - strtotime($process->stime)))) { + $this->_stopProcess($process, 'aborted'); + } + } + } + + /** + * Run processes with status run + * + * @return void + */ + protected function _runProcesses() + { + if(!($processes = $this->_getProcesses('run')) || $processes->count() == 0) { + return; + } + foreach ($processes as $process) { + $this->_runProcess($process); + } + } + + /** + * Run processes with status aborted + * + * @return void + */ + protected function _rerunProcesses() + { + if(!($processes = $this->_getProcesses('aborted')) || $processes->count() == 0) { + return; + } + foreach ($processes as $process) { + $this->_runProcess($process); + } + } + + /** + * Stop process by pid + * + * @param \CronManager\Queue\Model\Process $pid + * @param string $status + * @return \CronManager\Queue\Job\Queue\Inspector + */ + protected function _stopProcess(Process $process, $status = 'stopped') + { + $task = []; + $task['type'] = 'kill'; + $task['pid'] = $process->pid; + + //$this->_addQueue($task); + $this->_message = "Stop process with id: ".$process->id.", pid: ".$process->pid.", name: ".$process->action; + $this->notify(); + $this->write(json_encode($task)); + $this->_changeStatus($process->id, $status); + + return $this; + } + + /** + * Add process to queue for run + * + * @param \CronManager\Queue\Model\Process $process + * @return \CronManager\Queue\Job\Queue\Inspector + */ + protected function _runProcess(Process $process) + { + $task = []; + $job = Job::findFirst("id = '".$process->job_id."'"); + if ($process->phash == 1) { + $cmd = explode(" ", $process->command); + } else { + //$cmd = Process::findFirst("hash = '".$process->phash."'")->command; + //$cmd = trim(str_replace($process->job_id." ".$process->phash, "", $process->command)); + $cmd = explode(" ", $process->command); + } + if ($job->max_attempts == $process->attempt) { + $this->_changeStatus($process->id, 'error'); + return false; + } + $task['type'] = 'start'; + $task['name'] = $process->action; + $task['args'] = $cmd; + $task['observers'] = [ + [ + 'observer' => 'mysql', + 'hash' => $process->hash, + 'job_id' => $process->job_id, + 'options' => [ + 'processModel' => '\CronManager\Queue\Model\Process', + 'logModel' => '\CronManager\Queue\Model\Log', + 'parentHash' => $process->phash + ] + ] + ]; + $this->_message = "Add new job `".$task['name']."` in queue"; + $this->notify(); + $this->_addQueue($task); + $this->_changeStatus($process->id, 'waiting'); + + return $this; + } + + /** + * Return processes by status + * + * @param string $status + * @return array + */ + protected function _getProcesses($status) + { + $processes = Process::find("status = '".$status."'"); + + return $processes; + } + + /** + * Change process status + * + * @param integer $id + * @param string $status + * @return void + */ + protected function _changeStatus($id, $status) + { + $process = Process::findFirst("id = '".$id."'"); + $process->status = $status; + $res = $process->update(); + } +} + diff --git a/CronManager/Queue/Job/Manager.php b/CronManager/Queue/Job/Manager.php new file mode 100644 index 0000000..a2d98ff --- /dev/null +++ b/CronManager/Queue/Job/Manager.php @@ -0,0 +1,769 @@ +setDi($dependencyInjector); + } + + /** + * Destrcuctor + */ + public function __destruct() + { + parent::__destruct(); + $this->_daemonClose(); + } + + /** + * Initialize cron manager queue + * + * @return void + */ + protected function _initQueue() + { + $this->_producer = new \Thumper\Producer($this->getDi()->get('thumperConnection')->getConnection()); + $this->_producer->setExchangeOptions(['name' => $this->_config->rabbitmq->managerExchangeName, 'type' => $this->_config->rabbitmq->exchangeType]); + } + + /** + * Initialize cron manager configuration from database + * + * @return void + */ + protected function _initConfig() + { + $settings = $this->_config->daemon->settings; + $type = $settings['type']; + switch ($type) { + case 'model': + $environment = $settings['environment']; + $model = $settings['model']; + try { + $params = call_user_func([$model, 'findFirst'], "status='1' AND environment='".$environment."'"); + } catch (\Exception $e) { + $this->_message = $e->getMessage().PHP_EOL; + $this->notify(static::$MESSAGE_ERROR); + $params = false; + } + if ($params) { + if ($params->action_status == self::STATUS_RESTART && $this->_configFlag) { + $params->action_status = self::STATUS_RUNNING; + $params->update(); + } + $params = $params->toArray(); + } + break; + case 'array': + default: + $params = $settings['params']; + break; + } + if (!$params || !is_array($params)) { + return; + } + if (isset($params['max_pool'])) { + $this->_MAX_POOL = (int) $params['max_pool']; + } + if (isset($params['min_free_memory_mb'])) { + $this->_MIN_FREE_MEMORY_MB = (int) $params['min_free_memory_mb']; + } + if (isset($params['min_free_memory_percentage'])) { + $this->_MIN_FREE_MEMORY_PERCENTAGE = (int) $params['min_free_memory_percentage']; + } + if (isset($params['max_cpu_load'])) { + $this->_MAX_CPU_LOAD = (int) $params['max_cpu_load']; + } + if (isset($params['max_cpu_load'])) { + $this->_MAX_CPU_LOAD = (int) $params['max_cpu_load']; + } + if (isset($params['action_status'])) { + $this->_ACTION_STATUS = (int) $params['action_status']; + } + if ($this->_configFlag) { + $this->_configFlag = false; + } + } + + /** + * Initialize cron manager loging observer + * + * @return void + */ + protected function _initObserver() + { + $this->addObserver(new \CronManager\Tools\Observer\Stdout()); + } + + /** + * Notify about all running processes + * + * @return void + */ + protected function _eventStatistic() + { + $this->_message = "Active socket connections: ".count($this->_socketConnections).PHP_EOL; + $this->_message .= "Active jobs: ".count($this->_pool).PHP_EOL; + $this->_message .= "Free system memory: ".$this->_getFreeMemory().PHP_EOL; + $this->_initConfig(); + + $allocateMemory = 0; + foreach ($this->_pool as $pid => $job) { + if (!$job->isRunning()) { + $this->_message .= "Stopping job ".$this->_pool[$pid]->name()." ($pid)".PHP_EOL; + $this->stopJob($pid); + } else { + $job->setMessage(false); + $job->notify(); + $status = $job->getStatus2(); + $memory = intval($status['memory']); + $allocateMemory += $memory; + if ($memory > 500000) { + $this->_message .= "Alert process ".$job->name()." with pid ".$pid." allocate ".$memory." memory!"; + } else { + //echo "Process ".$job->name()." with pid ".$pid." allocate ".$memory." memory!\n"; + } + } + } + $this->_message .= "Allocate system memory: ".$allocateMemory.PHP_EOL; + if ($this->_queuePublishErrorMessage) { + $this->_message .= $this->_queuePublishErrorMessage.PHP_EOL; + } + $this->_message .= PHP_EOL; + $this->notify(); + } + + /** + * Run manager process + * + * @return boolean + */ + public function process() + { + $this->_streamTimeoutSecond = 0; + $this->_streamTimeoutMicrosecond = 300000; + + $this->_config = $this->getDi()->get('config'); + + $this->_daemonInit(); + $this->_initConfig(); + if ($this->_ACTION_STATUS == self::STATUS_STOP) { + $this->_message = "Exit from cron manager, because action in 'STOP' status!"; + $this->notify(); + return false; + } + $this->_initQueue(); + + $write = NULL; + $except = NULL; + $this->_runMainProcesses(); + sleep(2); + while (!$this->_is_terminated) { + $this->_accessJob = $this->_queuePublish(); + $this->_eventLoop(); + $this->_stdRead($write, $except); + $this->checkJobs(); + $this->_checkMainProcesses(); + $this->_checkActionStatus(); + usleep(200000); + } + $this->_message = "Exit from cron manager"; + $this->notify(); + + return true; + } + + /** + * Run all main processes + * + * @return void + */ + protected function _runMainProcesses() + { + if ($this->_ACTION_STATUS == self::STATUS_PENDING) { + $this->_message = "Cron pending"; + $this->notify(); + return; + } + $this->_runMainProcess('producer'); + $this->_runMainProcess('consumer'); + $this->_runMainProcess('inspector'); + } + + /** + * Run main process by his special key + * + * @param string $process + * @return void + */ + protected function _runMainProcess($process) + { + $documentRoot = $this->_config->application->documentRoot; + switch ($process) { + case 'producer': + $this->_mainProcesses['producer'] = $this->_runProducer($documentRoot); + break; + case 'consumer': + $this->_mainProcesses['consumer'] = $this->_runConsumer($documentRoot); + break; + case 'inspector': + $this->_mainProcesses['inspector'] = $this->_runInspector($documentRoot); + break; + } + } + + /** + * Run producer main process + * + * @return integer + */ + protected function _runProducer($documentRoot) + { + $this->_message = "Run main process 'producer'"; + $this->notify(); + $observers = []; + /*$observers = [ + [ + 'observer' => 'mysql', + 'hash' => 'producer', + 'job_id' => 0, + 'options' => [ + 'processModel' => '\CronManager\Queue\Model\Process', + 'logModel' => '\CronManager\Queue\Model\Log' + ] + ] + ];*/ + + return $this->startJob($documentRoot."/index.php cron-producer init", 'cron_producer', $observers); + } + + /** + * Run consumer main process + * + * @return integer + */ + protected function _runConsumer($documentRoot) + { + $this->_message = "Run main process 'consumer'"; + $this->notify(); + $observers = []; + + return $this->startJob($documentRoot."/index.php cron-consumer init", 'cron_consumer', $observers); + } + + /** + * Run inspector main process + * + * @return integer + */ + protected function _runInspector($documentRoot) + { + $this->_message = "Run main process 'inspector'"; + $this->notify(); + $observers = []; + + return $this->startJob($documentRoot."/index.php cron-inspector init", 'cron_inspector', $observers); + } + + /** + * Checking main processes and run it if it down + * + * @return void + */ + protected function _checkMainProcesses() + { + if ($this->_ACTION_STATUS == self::STATUS_PENDING) { + $this->_message = "Cron pending"; + $this->notify(); + return; + } + foreach ($this->_mainProcesses as $process => $pid) { + if ($this->getJob($pid)) { + continue; + } + $this->_message = "Main process '".$process."' was down"; + $this->notify(); + $this->_runMainProcess($process); + } + } + + /** + * Chech action status + * + * @return void + */ + protected function _checkActionStatus() + { + if ($this->_ACTION_STATUS == self::STATUS_STOP || $this->_ACTION_STATUS == self::STATUS_RESTART || $this->_ACTION_STATUS == self::STATUS_PENDING) { + $this->_stopAllJobs(); + if ($this->_ACTION_STATUS == self::STATUS_STOP || $this->_ACTION_STATUS == self::STATUS_RESTART) { + $this->_is_terminated = true; + } + } + } + + /** + * Stop all cron procceses and terminate manager + * + * @return void + */ + protected function _stopAllJobs() + { + $this->_message = "Stop all cron jobs"; + $this->notify(); + + $this->_stopMainProcesses(); + + foreach ($this->_pool as $pid => $job) { + //$job->signal(SIGINT); + $this->_message = "Stoping process '".$job->getName()."'"; + $this->notify(); + $this->stopJob($pid); + } + } + + /** + * Stop all main processes + * + * @return void + */ + protected function _stopMainProcesses() + { + $this->_stopMainProcess('producer'); + $this->_stopMainProcess('consumer'); + $this->_stopMainProcess('inspector'); + } + + /** + * Stop main process by his special key + * + * @param string $process + * @return void + */ + protected function _stopMainProcess($process) + { + switch ($process) { + case 'producer': + $this->_stopProducer(); + break; + case 'consumer': + $this->_stopConsumer(); + break; + case 'inspector': + $this->_stopInspector(); + break; + } + } + + /** + * Stop producer main process + * + * @return void + */ + protected function _stopProducer() + { + if (!$this->getJob($this->_mainProcesses['producer'])) { + return; + } + $this->_message = "Stop main process 'producer'"; + $this->notify(); + + $pid = $this->_mainProcesses['producer']; + $this->_pool[$pid]->signal(SIGINT); + $this->stopJob($pid); + } + + /** + * Run consumer main process + * + * @return void + */ + protected function _stopConsumer() + { + if (!$this->getJob($this->_mainProcesses['consumer'])) { + return; + } + $this->_message = "Stop main process 'consumer'"; + $this->notify(); + + $pid = $this->_mainProcesses['consumer']; + $this->_pool[$pid]->signal(SIGINT); + + if (!$this->_accessJob) { + $this->_producer->publish(1); + sleep(1); + } + $producer = new \Thumper\Producer($this->getDi()->get('thumperConnection')->getConnection()); + $producer->setExchangeOptions(['name' => $this->_config->rabbitmq->jobExchangeName, 'type' => $this->_config->rabbitmq->exchangeType]); + $producer->publish(igbinary_serialize([])); + + $this->stopJob($pid); + } + + /** + * Stop inspector main process + * + * @return void + */ + protected function _stopInspector() + { + if (!$this->getJob($this->_mainProcesses['inspector'])) { + return; + } + $this->_message = "Stop main process 'inspector'"; + $this->notify(); + + $pid = $this->_mainProcesses['inspector']; + $this->_pool[$pid]->signal(SIGINT); + $this->stopJob($pid); + } + + + /** + * Process new message + * + * @return void + */ + protected function _processMessage($pid, $message, $type) + { + if (empty($message)) { + return; + } + $name = $this->name($pid); + switch ($name) { + case 'cron_consumer': + case 'cron_producer': + case 'cron_inspector': + $this->_message = strtoupper(str_replace("_", " ", $name))." PID: ".$pid.", message: ".$message.PHP_EOL; + $this->notify(); + break; + } + } + + /** + * Start new process job + * + * @return integer|boolean + */ + public function startJob($cmd, $name = 'job', array $observers = array()) + { + if (!($pid = parent::startJob($cmd, $name , $observers))) { + $this->_message = "Job ".$name." not started".PHP_EOL; + $this->notify(); + return false; + } + $this->_queueFlag = true; + + return $pid; + } + + /** + * Stop process job by his pid + * + * @return boolean + */ + public function stopJob($pid) + { + if (!parent::stopJob($pid)) { + return false; + } + --$this->_queueCount; + + return true; + } + + /** + * Initialize daemon + * + * @return void + */ + protected function _daemonInit() + { + $this->_eventTimerInterval = 5000000; + $this->_pidFile = $this->_config->daemon->pid; + $this->_logFile = $this->_config->daemon->log; + $this->_errorLogFile = $this->_config->daemon->error; + $this->_socketFile = $this->_config->daemon->socket; + $this->_lockFile = $this->_config->daemon->lock; + + $this->_forkInit(); + $this->_locking(); + $this->_initPIDFile(); + $this->_initLogs(); + $this->_initObserver(); + $this->_initSocket(); + $this->_initEventBase(); + $this->_initTimer(); + + $this->_DESCRIPTORS = ['read_persist' => EV_READ | EV_PERSIST]; + foreach ($this->_DESCRIPTORS as $name => $descriptor) { + $handler = $this->_getEventHandler($name); + $this->_setEvent($name, $this->_socket, $descriptor, $handler); + } + } + + /** + * Close daemon + * + * @return void + */ + protected function _daemonClose() + { + $this->_closeEvents(); + $this->_closeBuffers(); + $this->_closeSocketConnections(); + $this->_closeSocket(); + $this->_closeLogs(); + $this->_closePIDFile(); + $this->_unlocking(); + } + + /** + * Publish new message in queue with new job access + * + * @return boolean + */ + protected function _queuePublish() + { + if (!$this->_queueFlag) { + $this->_queuePublishErrorMessage = "Queue flag false status".PHP_EOL; + return false; + } + + if ($this->_MAX_CPU_LOAD !== 0) { + $load = sys_getloadavg(); + if ($load[0] > $this->_MAX_CPU_LOAD) { + $this->_queuePublishErrorMessage = "Cpu load ".$load.PHP_EOL; + return false; + } + } + if ($this->_MIN_FREE_MEMORY_PERCENTAGE !== 0) { + $memory = $this->_getFreeMemory(true); + if ($memory < $this->_MIN_FREE_MEMORY_PERCENTAGE) { + $this->_queuePublishErrorMessage = "Free memory ".$memory."%".PHP_EOL; + return false; + } + } + if ($this->_MIN_FREE_MEMORY_MB !== 0) { + $memory = $this->_getFreeMemory(false); + if ($memory < $this->_MIN_FREE_MEMORY_MB) { + $this->_queuePublishErrorMessage = "Free memory ".$memory." Mb".PHP_EOL; + return false; + } + } + if (!$this->_existsFreePool() || ($this->_MAX_POOL - $this->_queueCount) == 0) { + $this->_queuePublishErrorMessage = "Max pool ".$this->_queueCount.PHP_EOL; + return false; + } + + $this->_message = "Publish access to add new job".PHP_EOL; + $this->notify(); + + $this->_queuePublishErrorMessage = false; + $this->_producer->publish(1); + ++$this->_queueCount; + $this->_queueFlag = false; + + return true; + } + + /** + * Return free system memory + * + * @param boolean $percentage + * @return integer + */ + protected function _getFreeMemory($percentage = false) + { + if ($percentage) { + exec("free | grep Mem | awk '{print $4/$2 * 100.0}'", $memory); + } else { + exec("free | grep Mem | awk '{print $4}'", $memory); + } + + return $memory[0]; + } + + /** + * @param $name + * @return array + */ + protected function _getEventHandler($name) + { + switch ($name) { + case 'read_persist': + return [$this, 'onAccept']; + break; + } + } + + /** + * Init new socket connection + * + * @return true + */ + public function onAccept($socket, $flag) + { + $id = $this->_setSocketConnection(); + $connection = $this->_getSocketConnection($id); + + $onRead = $this->_getBufferReadHandler(); + $onWrite = $this->_getBufferWriteHandler(); + $onError = $this->_getBufferErrorHandler(); + + $this->_setEventBuffer($id, $connection, $onRead, $onWrite, $onError, EV_READ | EV_PERSIST); + + return true; + } + + /** + * Close socket connection after timeout or error + * + * @return void + */ + public function onError($buffer, $error, $id) + { + $this->_closeEventBuffer($id, EV_READ | EV_WRITE); + $this->_closeSocketConnection($id); + } + + /** + * Read from socket connection + * + * @return void + */ + public function onRead($buffer, $id) + { + $content = $this->_readEventBuffer($id, 256); + if (($params = json_decode($content, true))) { + $this->_dispatch($params); + } else { + var_dump($content); + } + } + + /** + * Return handler + * + * @return \Closure + */ + protected function _getBufferReadHandler() + { + $server = $this; + $handler = function ($buffer, $id) use ($server) { + return $server->onRead($buffer, $id); + }; + + return [$this, 'onRead']; + } + + /** + * Return handler + * + * @return \Closure + */ + protected function _getBufferWriteHandler() + { + return null; + } + + /** + * Return handler + * + * @return \Closure + */ + protected function _getBufferErrorHandler() + { + $server = $this; + $handler = function ($buffer, $error, $id) use ($server) { + return $server->onError($buffer, $error, $id); + }; + + return array($this, 'onError'); + } +} + diff --git a/CronManager/Queue/Job/Producer.php b/CronManager/Queue/Job/Producer.php new file mode 100644 index 0000000..72732d2 --- /dev/null +++ b/CronManager/Queue/Job/Producer.php @@ -0,0 +1,193 @@ +isTerminated()) { + $currentDate = new DateTime(null, $timezone); + $sec = $currentDate->format("s"); + if (!$first && $last === $sec) { + usleep(500000); + continue; + } + if ($first || ($sec != 0 && ($sec % 50) == 0)) { + $next = true; + $tmpJobs = $this->_getJobs(clone $currentDate); + } + if ($first) { + $jobs = $tmpJobs; + $first = false; + } elseif ($next && $sec >= 0) { + $jobs = $tmpJobs; + $next = false; + } + foreach ($jobs as $job) { + if (!$this->_cronSecondDue($currentDate, $job['second'])) { + continue; + } + $timestamp = $currentDate->getTimestamp(); + if (array_key_exists($job['id'], $lastJobsTimeStart) && $lastJobsTimeStart[$job['id']] === $timestamp) { + continue; + } + $lastJobsTimeStart[$job['id']] = $currentDate->getTimestamp(); + $this->_runJob($job); + } + $last = $sec; + } + } + + /** + * Load and set all cron jobs + * + * @param array $jobs + * @return \CronManager\Queue\Job\Queue\Producer + */ + public function setJobs() + { + $this->clearJobs(); + $cronjobs = Job::find(["status = '1'"])->toArray(); + if (!is_array($cronjobs)) { + return $this; + } + $this->_jobs = $cronjobs; + foreach ($this->_jobs as $job) { + $this->_crontabs[$job['id']] = CronExpression::factory($job['minute']." ".$job['hour']." ".$job['day']." ".$job['month']." ".$job['week_day']); + } + $this->_seconds = new SecondsField(); + + return $this; + } + + /** + * Clear cron jobs + * + * @return \CronManager\Queue\Job\Queue\Producer + */ + public function clearJobs() + { + $this->_jobs = []; + $this->_crontabs = []; + $this->_seconds = null; + + return $this; + } + + /** + * Return if exists jobs that will be run in next minute + * + * @return array + */ + protected function _getJobs(DateTime $datetime) + { + $this->setJobs(); + $jobs = []; + foreach ($this->_jobs as $job) { + $tmpDatetime = clone $datetime; + if (!$this->_crontabs[$job['id']]->isDue($tmpDatetime->modify("+1 minute"))) { + continue; + } + $jobs[] = $job; + } + + return $jobs; + } + + /** + * Parse crontime by second + * + * @param DateTime $timestamp + * @param string $cronsecond + * @return boolean + */ + protected function _cronSecondDue(DateTime $dateTime, $part) + { + if (strpos($part, ',') === false) { + return $this->_seconds->isSatisfiedBy($dateTime, $part); + } else { + foreach (array_map('trim', explode(',', $part)) as $listPart) { + if ($this->_seconds->isSatisfiedBy($dateTime, $listPart)) { + return true; + } + } + } + + return false; + } + + /** + * Add job task to queue + * + * @param array $job + * @return \CronManager\Queue\Job\Queue\Producer + */ + protected function _runJob(array $job) + { + $task = []; + $task['type'] = 'start'; + $task['name'] = $job['name']; + $hash = md5($job['id']."_".$job['name']."_".microtime()); + //$task['cmd'] = $job['command']." ".$job['id']; + $task['args'] = [$job['command'], $job['id'], $hash]; + $task['observers'] = [['observer' => 'mysql', 'hash' => $hash, 'job_id' => $job['id'], 'options' => ['processModel' => '\CronManager\Queue\Model\Process', 'logModel' => '\CronManager\Queue\Model\Log']]]; + + $this->_addQueue($task); + + return $this; + } +} + diff --git a/CronManager/Queue/Job/Producer/Add.php b/CronManager/Queue/Job/Producer/Add.php new file mode 100644 index 0000000..affe56f --- /dev/null +++ b/CronManager/Queue/Job/Producer/Add.php @@ -0,0 +1,59 @@ +_connect(); + } + + protected function _connect() + { + $config = $this->getDi()->get('config'); + $this->_queueProducer = new \Thumper\Producer($this->getDi()->get('thumperConnection')->getConnection()); + $this->_queueProducer->setExchangeOptions(['name' => $config->rabbitmq->jobExchangeName, 'type' => $config->rabbitmq->exchangeType]); + } + + /** + * Add job to queue + * + * @param array $job + * @return \CronManager\Queue\Job\Queue\Producer\Add + */ + protected function _addQueue(array $task) + { + try { + $this->_queueProducer->publish(igbinary_serialize($task)); + } catch (\Exception $e) { + $this->_message = $e->getMessage(); + $this->notify(); + + $this->_connect(); + $this->_queueProducer->publish(igbinary_serialize($task)); + } + $this->_message = "Add new job `".$task['name']."` in queue"; + $this->notify(); + + return $this; + } +} + diff --git a/CronManager/Queue/Model/Job.php b/CronManager/Queue/Model/Job.php new file mode 100644 index 0000000..7da31c0 --- /dev/null +++ b/CronManager/Queue/Model/Job.php @@ -0,0 +1,94 @@ +belongsTo("process_id", "Process", "id"); + } + + public function getSource() + { + return "cron_job"; + } +} diff --git a/CronManager/Queue/Model/Log.php b/CronManager/Queue/Model/Log.php new file mode 100644 index 0000000..6ec3171 --- /dev/null +++ b/CronManager/Queue/Model/Log.php @@ -0,0 +1,54 @@ +belongsTo("process_id", "Process", "id"); + } + + public function getSource() + { + return "cron_process_log"; + } +} diff --git a/CronManager/Queue/Model/Process.php b/CronManager/Queue/Model/Process.php new file mode 100644 index 0000000..b92a8e8 --- /dev/null +++ b/CronManager/Queue/Model/Process.php @@ -0,0 +1,84 @@ +belongsTo("job_id", "Job", "id"); + } + + public function getSource() + { + return "cron_process"; + } +} diff --git a/CronManager/Queue/Model/Settings.php b/CronManager/Queue/Model/Settings.php new file mode 100644 index 0000000..3ba65d1 --- /dev/null +++ b/CronManager/Queue/Model/Settings.php @@ -0,0 +1,56 @@ +setOptions($options); + + $this->_openLogFile(); + } + + /** + * Set options + * + * @param array $options + * @return \CronManager\Tools\Observer\File + */ + public function setOptions(array $options) + { + if (isset($options['logPath'])) { + $this->_path = rtrim($options['logPath'], "/\\")."/"; + } else { + $this->_path = sys_get_temp_dir()."/"; + } + + if (isset($options['logName'])) { + $this->_name = $options['logName']; + } else { + $this->_name = false; + } + + return $this; + } + + /** + * Init log file + * + * @return void + */ + private function _openLogFile() + { + if ($this->_name) { + $this->_logFile = fopen($this->_path.$this->_name.".log", "a+"); + } + } + + public function onEvent($subject) + { + if (!$this->_logFile) { + $this->_logFile = fopen($this->_path.strtolower(get_class($subject)).".log", "a+"); + } + $str = "------------------------------------\n"; + $str .= "Notify date: ".date("Y-m-d H:i:s")."\n"; + switch ($subject->getMessageType()) { + case 2: + $str .= "Message type: error".PHP_EOL; + break; + case 1: + default: + $str .= "Message type: info".PHP_EOL; + break; + } + + $str .= $subject->getMessage().PHP_EOL; + $str .= PHP_EOL; + + fwrite($this->_logFile, $str); + + $this->_update(); + } + + public function __destruct() + { + if ($this->_logFile) { + fclose($this->_logFile); + } + } +} \ No newline at end of file diff --git a/CronManager/Tools/Observer/Mysql.php b/CronManager/Tools/Observer/Mysql.php new file mode 100644 index 0000000..1e3d5ac --- /dev/null +++ b/CronManager/Tools/Observer/Mysql.php @@ -0,0 +1,70 @@ +setOptions($options); + } + + /** + * Set options + * + * @param array $options + * @return \CronManager\Tools\Observer\Mysql + */ + public function setOptions(array $options) + { + if (array_key_exists('model', $options)) { + $this->_model = $options['logName']; + } else { + throw new \Exception("Log model not set"); + } + + return $this; + } + + public function onEvent($subject) + { + $log = new $this->_model(); + + switch ($subject->getMessageType()) { + case 2: + $log->type = 'error'; + break; + case 1: + default: + $log->type = 'message'; + break; + } + + $log->class = get_class($subject); + $log->message = $subject->getMessage(); + $log->time = date("Y-m-d H:i:s"); + $log->save(); + } + +} \ No newline at end of file diff --git a/CronManager/Tools/Observer/Stdout.php b/CronManager/Tools/Observer/Stdout.php new file mode 100644 index 0000000..5e2c023 --- /dev/null +++ b/CronManager/Tools/Observer/Stdout.php @@ -0,0 +1,55 @@ +setOptions($options); + } + + /** + * Set options + * + * @param array $options + * @return \CronManager\Tools\Observer\Stdout + */ + public function setOptions(array $options) + { + return $this; + } + + public function onEvent($subject, $messageType) + { + $str = PHP_EOL."Class: ".get_class($subject).", date: ".date("Y-m-d H:i:s").PHP_EOL; + switch ($messageType) { + case 2: + $str .= "Message type: error".PHP_EOL; + break; + case 1: + default: + $str .= "Message type: info".PHP_EOL; + break; + } + + $str .= $subject->getMessage().PHP_EOL; + + echo $str; + } +} \ No newline at end of file diff --git a/CronManager/Traits/DIaware.php b/CronManager/Traits/DIaware.php new file mode 100644 index 0000000..f4fbe6d --- /dev/null +++ b/CronManager/Traits/DIaware.php @@ -0,0 +1,39 @@ +_di = $dependencyInjector; + } + + /** + * Returns the internal dependency injector + * + * @return \Phalcon\DiInterface + */ + public function getDi() + { + return $this->_di; + } + +} \ No newline at end of file diff --git a/CronManager/Traits/Daemon/Event.php b/CronManager/Traits/Daemon/Event.php new file mode 100644 index 0000000..9699609 --- /dev/null +++ b/CronManager/Traits/Daemon/Event.php @@ -0,0 +1,287 @@ + EV_TIMEOUT, + 'signal' => EV_SIGNAL, + 'read' => EV_READ, + 'write' => EV_WRITE, + 'persist' => EV_PERSIST + ]; + + /** + * Statistic event + * @var event + */ + private $_eventTimer; + + /** + * Statistic event timeout in microseconds + * @var integer + */ + protected $_eventTimerInterval = 1000000; + + /** + * Initialize event base + * + * @return void + */ + protected function _initEventBase() + { + $this->_eventBase = event_base_new(); + } + + /** + * Set new event + * + * @param string $name + * @param resource $resource + * @param string $descriptor + * @param string|array $handler + * @return void + */ + protected function _setEvent($name, $resource, $descriptor, $handler) + { + if (isset($this->_events[$name]) || $handler === null) { + return false; + } + $event = event_new(); + event_set($event, $resource, $descriptor, $handler); + $res = event_base_set($event, $this->_eventBase); + event_add($event); + + $this->_events[$name] = $event; + } + + /** + * Close event by name + * + * @param integer $id + * @return boolean + */ + protected function _closeEvent($name) + { + if (!isset($this->_events[$name])) { + return false; + } + event_del($this->_events[$name]); + event_free($this->_events[$name]); + unset($this->_events[$name]); + + return true; + } + + /** + * Set new event buffer + * + * @param integer $id + * @param resource $resource + * @param string|array $onRead + * @param string|array $onWrite + * @param string|array $onError + * @return void + */ + protected function _setEventBuffer($id, $resource, $onRead, $onWrite, $onError, $descriptor) + { + $buffer = event_buffer_new($resource, $onRead, $onWrite, $onError, $id); + event_buffer_base_set($buffer, $this->_eventBase); + event_buffer_timeout_set($buffer, 30, 30); + event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff); + event_buffer_priority_set($buffer, 10); + event_buffer_enable($buffer, $descriptor); + + $this->_eventBuffers[$id] = $buffer; + } + + /** + * Get event buffer by id + * + * @param integer $id + * @return resource + */ + protected function _getEventBuffer($id) + { + if (!isset($this->_eventBuffers[$id])) { + return false; + } + + return $this->_eventBuffers[$id]; + } + + /** + * Read from buffer by id + * + * @param integer $id + * @param integer $length + * @return string|boolean + */ + protected function _readEventBuffer($id, $length) + { + if (!isset($this->_eventBuffers[$id])) { + return false; + } + $content = ''; + while($read = event_buffer_read($this->_eventBuffers[$id], $length)) { + $content .= $read; + } + + return $content; + } + + /** + * Close event buffer by id + * + * @param integer $id + * @param string $descriptor + * @return boolean + */ + protected function _closeEventBuffer($id, $descriptor = null) + { + if (!isset($this->_eventBuffers[$id])) { + return false; + } + event_buffer_disable($this->_eventBuffers[$id], $descriptor); + event_buffer_free($this->_eventBuffers[$id]); + unset($this->_eventBuffers[$id]); + + return true; + } + + /** + * Handle events + * + * return void + */ + protected function _eventLoop() + { + event_base_loop($this->_eventBase, EVLOOP_NONBLOCK); + } + + /** + * Close events + * + * @return void + */ + private function _closeEvents() + { + foreach ($this->_events as $name => $event) { + event_del($event); + event_free($event); + unset($this->_events[$name]); + } + if ($this->_eventTimer) { + event_del($this->_eventTimer); + event_free($this->_eventTimer); + $this->_eventTimer = false; + } + if ($this->_eventBase) { + event_base_free($this->_eventBase); + } + } + + /** + * Close event buffers + * + * @return void + */ + private function _closeBuffers() + { + foreach ($this->_eventBuffers as $id => $buffer) { + event_buffer_disable($buffer); + event_buffer_free($buffer); + unset($this->_eventBuffers[$id]); + } + } + + /** + * Return handler + * + * @return \Closure + */ + protected function _timerEventHandler() + { + $server = $this; + $handler = function ($tmpfile, $flag ,$interval) use ($server) { + $server->onTimer($tmpfile, $flag, $interval); + }; + + return [$this, 'onTimer']; + } + + /** + * Initialize statistic event + * + * @return boolean + */ + protected function _initTimer() + { + $tmpfile = tmpfile(); + $this->_eventTimer = event_new(); + event_set($this->_eventTimer, $tmpfile, 0, $this->_timerEventHandler(), $this->_eventTimerInterval); + $res = event_base_set($this->_eventTimer, $this->_eventBase); + + return event_add($this->_eventTimer, $this->_eventTimerInterval); + } + + /** + * Statistic event callback + * + * @return void + */ + public function onTimer($tmpfile, $flag, $interval) + { + if ($this->_eventTimer) { + event_del($this->_eventTimer); + event_free($this->_eventTimer); + } + $this->_eventStatistic(); + + $this->_eventTimer = event_new(); + event_set($this->_eventTimer, $tmpfile, 0, $this->_timerEventHandler(), $interval); + $res = event_base_set($this->_eventTimer, $this->_eventBase); + + return event_add($this->_eventTimer, $interval); + } + + /** + * Event timer function + * + * @return void + */ + protected function _eventStatistic() + { + + } +} \ No newline at end of file diff --git a/CronManager/Traits/Daemon/Fork.php b/CronManager/Traits/Daemon/Fork.php new file mode 100644 index 0000000..da8b31e --- /dev/null +++ b/CronManager/Traits/Daemon/Fork.php @@ -0,0 +1,77 @@ + 0) { + //echo "Daemon process started\n"; + exit; + } + $sid = posix_setsid(); + if ($sid < 0) { + exit(2); + } + } + + /** + * Initialize pid file + * + * @return void + */ + private function _initPIDFile() + { + $this->_pid = getmypid(); + file_put_contents($this->_pidFile, $this->_pid); + + } + + /** + * Unlink pid file + * + * @return void + */ + private function _closePIDFile() + { + if (file_exists($this->_pidFile)) { + unlink($this->_pidFile); + } + } + + /** + * Return pid number + * + * @return integer + */ + public function getPID() + { + return $this->_pid; + } +} \ No newline at end of file diff --git a/CronManager/Traits/Daemon/Logs.php b/CronManager/Traits/Daemon/Logs.php new file mode 100644 index 0000000..0d34dba --- /dev/null +++ b/CronManager/Traits/Daemon/Logs.php @@ -0,0 +1,62 @@ +_STDIN = fopen('/dev/null', 'r'); + $this->_STDOUT = fopen($this->_logFile, 'ab'); + $this->_STDERR = fopen($this->_errorLogFile, 'ab'); + } + + /** + * Close logs + * + * @return void + */ + private function _closeLogs() + { + if ($this->_STDIN) { + fclose($this->_STDIN); + } + if ($this->_STDIN) { + fclose($this->_STDIN); + } + if ($this->_STDIN) { + fclose($this->_STDIN); + } + } + +} \ No newline at end of file diff --git a/CronManager/Traits/Daemon/Socket/Client.php b/CronManager/Traits/Daemon/Socket/Client.php new file mode 100644 index 0000000..2f3549f --- /dev/null +++ b/CronManager/Traits/Daemon/Socket/Client.php @@ -0,0 +1,82 @@ +_socket = stream_socket_client($this->_socketFile, $this->_socketErrno, $this->_socketErrstr, 5); + stream_set_blocking($this->_socket, 0); + } + + /** + * Write to server + * + * @return void + */ + public function write($message) + { + if (!$this->_socket) { + $this->_initClient(); + } + fwrite($this->_socket, $message); + $this->_closeClient(); + } + + /** + * Read from server + * + * @return string + */ + public function read() + { + if (!$this->_socket) { + $this->_initClient(); + } + $content = ''; + while (!feof($this->_socket)) { + $content .= fgets($this->_socket, 128); + } + $this->_closeClient(); + + return $content; + } + + protected function _closeClient() + { + if (!$this->_socket) { + return; + } + fclose($this->_socket); + $this->_socket = false; + } +} diff --git a/CronManager/Traits/Daemon/Socket/Server.php b/CronManager/Traits/Daemon/Socket/Server.php new file mode 100644 index 0000000..c477d1e --- /dev/null +++ b/CronManager/Traits/Daemon/Socket/Server.php @@ -0,0 +1,147 @@ +_errno = 0; + $this->_errstr = ''; + $this->_socket = stream_socket_server($this->_socketFile, $this->_errno, $this->_errstr); + if (!$this->_socket) { + echo "Socket not init\n"; + exit(2); + } + stream_set_blocking($this->_socket, 0); + } + + /** + * Checking for free socket connection + * + * @return boolean + */ + protected function _existsFreeConnection() + { + return !(count($this->_socketConnections) == $this->_socketMaxConnections); + } + + /** + * Set new socket connection + */ + protected function _setSocketConnection() + { + if (!$this->_existsFreeConnection()) { + echo "Max connections\n"; + return false; + } + $connection = stream_socket_accept($this->_socket); + if (!$connection) { + echo "Not connectged\n"; + return; + } + stream_set_blocking($connection, 0); + $id = $this->_socketConnectionPrimaryId; + $this->_socketConnections[] = $connection; + ++$this->_socketConnectionPrimaryId; + + return $id; + } + + /** + * Get socket connection by id + * + * @param integer $id + * @return resource + */ + protected function _getSocketConnection($id) + { + if (!isset($this->_socketConnections[$id])) { + return false; + } + + return $this->_socketConnections[$id]; + } + + /** + * Close socket connection by id + * + * @param integer $id + * @return boolean + */ + protected function _closeSocketConnection($id) + { + if (!isset($this->_socketConnections[$id])) { + return false; + } + fclose($this->_socketConnections[$id]); + unset($this->_socketConnections[$id]); + + return true; + } + + /** + * Close socket connections + * + * @return boolean + */ + protected function _closeSocketConnections() + { + foreach ($this->_socketConnections as $id => $connection) { + fclose($connection); + unset($this->_socketConnections[$id]); + } + + return true; + } + + /** + * Close socket server + * + * @return void + */ + protected function _closeSocket() + { + if ($this->_socket) { + fclose($this->_socket); + unlink($this->_socketFile); + } + } +} \ No newline at end of file diff --git a/CronManager/Traits/Locking.php b/CronManager/Traits/Locking.php new file mode 100644 index 0000000..c6f35d6 --- /dev/null +++ b/CronManager/Traits/Locking.php @@ -0,0 +1,67 @@ +_lockFile) && filesize($this->_lockFile) > 0) { + $pid = trim(file_get_contents($this->_lockFile)); + } + if (FALSE !== ($this->_lockFH = fopen($this->_lockFile, 'w'))) { + if (TRUE !== flock($this->_lockFH, LOCK_EX | LOCK_NB)) { + if (!($pid && !$this->_checkPid(trim($pid)))) { + $this->_notUnlock = true; + fwrite($this->_lockFH, $pid); + exit; + } + } + } else { + exit; + } + + fwrite($this->_lockFH, getmypid()); + } + + /** + * Ulocking process + * + * @return void + */ + protected function _unlocking() + { + if ($this->_lockFH && !$this->_notUnlock) { + fclose($this->_lockFH); + unlink($this->_lockFile); + } + } + + /** + * Check process id + * + * @return boolean + */ + protected function _checkPid($pid) + { + exec("ps -p $pid -o pid", $status); + + return (count($status) > 1 && $status[1] > 0) ? true : false; + } +} \ No newline at end of file diff --git a/CronManager/Traits/Message.php b/CronManager/Traits/Message.php new file mode 100644 index 0000000..b523039 --- /dev/null +++ b/CronManager/Traits/Message.php @@ -0,0 +1,40 @@ +_message; + } + + /** + * Set notify message + * + * @param string $message + * @return \CronManager\Manager\Job + */ + public function setMessage($message) + { + $this->_message = $message; + return $this; + } +} \ No newline at end of file diff --git a/CronManager/Traits/Observable.php b/CronManager/Traits/Observable.php new file mode 100644 index 0000000..ed1d9e8 --- /dev/null +++ b/CronManager/Traits/Observable.php @@ -0,0 +1,54 @@ +__observers[] = $observer; + + return $this; + } + + /** + * Notify + * + * @return \CronManager\Traitss\Observable + */ + public function notify($messageType = 1) + { + foreach ($this->__observers as $o) { + $o->onEvent($this, $messageType); + } + + return $this; + } + + /** + * Abstarct get message method + * + * @return string + */ + abstract public function getMessage(); +} \ No newline at end of file diff --git a/CronManager/Traits/Observer.php b/CronManager/Traits/Observer.php new file mode 100644 index 0000000..0799d0c --- /dev/null +++ b/CronManager/Traits/Observer.php @@ -0,0 +1,33 @@ +addObserver($this); + + return $this; + } +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..a217344 --- /dev/null +++ b/composer.json @@ -0,0 +1,21 @@ +{ + "name": "temafey/phalcon-cron", + "type": "library", + "description": "Cron manager engine base on Phalcon framework", + "keywords": ["phalcon", "cron","job"], + "homepage": "https://github.com/temafey/phalcon_cron", + "license": "MIT", + "require": { + "php": ">=5.4.0", + "phalcon/incubator": "dev-master", + "phalcon/devtools": "dev-master", + "videlalvaro/php-amqplib": "v2.1.0", + "videlalvaro/thumper": "v0.3.2", + "mtdowling/cron-expression": "dev-master" + }, + "autoload": { + "psr-0": { + "Library": "/" + } + } +} diff --git a/sql/migrations.sql b/sql/migrations.sql new file mode 100644 index 0000000..b0e24de --- /dev/null +++ b/sql/migrations.sql @@ -0,0 +1,86 @@ +-- -------------------------------------------------------- + +-- +-- Table structure for table `cron_job` +-- + +DROP TABLE IF EXISTS `cron_job`; +CREATE TABLE IF NOT EXISTS `cron_job` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `name` varchar(200) NOT NULL, + `command` varchar(200) NOT NULL, + `second` varchar(100) NOT NULL, + `minute` varchar(100) NOT NULL, + `hour` varchar(100) NOT NULL, + `day` varchar(100) NOT NULL, + `month` varchar(100) NOT NULL, + `week_day` varchar(100) NOT NULL, + `status` tinyint(1) NOT NULL DEFAULT '0', + `ttl` int(11) NOT NULL DEFAULT '0', + `max_attempts` tinyint(2) NOT NULL DEFAULT '5', + `desc` varchar(250) NOT NULL, + PRIMARY KEY (`id`), + KEY `status` (`status`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +-- -------------------------------------------------------- + +-- +-- Table structure for table `cron_process` +-- + +DROP TABLE IF EXISTS `cron_process`; +CREATE TABLE IF NOT EXISTS `cron_process` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `job_id` int(11) NOT NULL, + `hash` varchar(40) NOT NULL, + `command` varchar(255) NOT NULL, + `action` varchar(40) NOT NULL, + `pid` int(6) NOT NULL, + `status` enum('run','running','completed','aborted','error','stopped','stop','waiting','finished') NOT NULL, + `stime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `time` int(11) NOT NULL DEFAULT '0', + `phash` varchar(40) NOT NULL DEFAULT '1', + `attempt` tinyint(2) NOT NULL DEFAULT '0', + PRIMARY KEY (`id`), + UNIQUE KEY `hash` (`hash`), + KEY `phash` (`phash`,`status`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +-- -------------------------------------------------------- + +-- +-- Table structure for table `cron_process_log` +-- + +DROP TABLE IF EXISTS `cron_process_log`; +CREATE TABLE IF NOT EXISTS `cron_process_log` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `process_id` int(11) NOT NULL, + `type` varchar(100) NOT NULL, + `message` text NOT NULL, + `time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `process_id` (`process_id`), + KEY `time` (`time`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +-- -------------------------------------------------------- + +-- +-- Table structure for table `cron_settings` +-- + +DROP TABLE IF EXISTS `cron_settings`; +CREATE TABLE IF NOT EXISTS `cron_settings` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `environment` varchar(200) NOT NULL, + `max_pool` int(11) NOT NULL DEFAULT '10', + `min_free_memory_mb` int(11) NOT NULL DEFAULT '0', + `min_free_memory_percentage` int(11) NOT NULL DEFAULT '10', + `max_cpu_load` int(11) NOT NULL DEFAULT '40', + `action_status` smallint(2) NOT NULL DEFAULT '1', + `status` tinyint(1) NOT NULL DEFAULT '0', + PRIMARY KEY (`id`), + KEY `status` (`status`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8;