File "Publisher.php"
Full Path: /home/rrterraplen/public_html/wp-content-20241221212636/plugins/worker/src/Gelf/Publisher.php
File size: 7.61 KB
MIME-type: text/x-php
Charset: utf-8
<?php
class Gelf_Publisher
{
const CHUNK_SIZE_WAN = 1420;
const CHUNK_SIZE_LAN = 8154;
const GRAYLOG2_DEFAULT_PORT = 12201;
const GRAYLOG2_PROTOCOL_VERSION = '1.0';
/**
* @var string
*/
protected $hostname;
/**
* @var int
*/
protected $port;
/**
* @var int|null
*/
protected $fallbackPort;
/**
* @var int
*/
protected $chunkSize;
/**
* @var resource|null
*/
protected $streamSocketClient = null;
/**
* @var bool
*/
private static $brokenSocket = false;
/**
* Creates a new publisher that sends errors to a Graylog2 server via UDP
*
* @throws InvalidArgumentException
*
* @param string $hostname
* @param integer $port
* @param integer|null $fallbackPort
* @param integer $chunkSize
*/
public function __construct($hostname, $port = null, $fallbackPort = null, $chunkSize = null)
{
// Check whether the parameters are set correctly
if (!$hostname) {
throw new InvalidArgumentException('$hostname must be set');
}
if ($port === null) {
$port = self::GRAYLOG2_DEFAULT_PORT;
} elseif (!is_numeric($port)) {
throw new InvalidArgumentException('$port must be an integer');
}
if ($fallbackPort !== null && !is_numeric($fallbackPort)) {
throw new InvalidArgumentException('$fallbackPort must be an integer');
}
if ($chunkSize === null) {
$chunkSize = self::CHUNK_SIZE_WAN;
} elseif (!is_numeric($chunkSize)) {
throw new InvalidArgumentException('$chunkSize must be an integer');
}
$this->hostname = $hostname;
$this->port = $port;
$this->fallbackPort = $fallbackPort;
$this->chunkSize = $chunkSize;
}
/**
* Publishes a Gelf_Message, returns false if an error occurred during write.
*
* @throws UnexpectedValueException
*
* @param Gelf_Message $message
*
* @return boolean
*/
public function publish(Gelf_Message $message)
{
if (self::$brokenSocket) {
return false;
}
// Check if required message parameters are set
if (!$message->getShortMessage() || !$message->getHost()) {
throw new UnexpectedValueException(
'Missing required data parameter: "version", "short_message" and "host" are required.'
);
}
// Set Graylog protocol version
$message->setVersion(self::GRAYLOG2_PROTOCOL_VERSION);
// Encode the message as json string and compress it using gzip
$preparedMessage = $this->getPreparedMessage($message);
// Infinite-loop break.
self::$brokenSocket = true;
// Open a connection to GrayLog server.
$socket = $this->getSocketConnection();
if (!$socket) {
return false;
}
self::$brokenSocket = false;
// Several udp writes are required to publish the message
if ($this->isMessageSizeGreaterChunkSize($preparedMessage)) {
// A unique id which consists of the microtime and a random value
$messageId = $this->getMessageId();
// Split the message into chunks.
$messageChunks = $this->getMessageChunks($preparedMessage);
$messageChunksCount = count($messageChunks);
// Send chunks to GrayLog server.
foreach (array_values($messageChunks) as $messageChunkIndex => $messageChunk) {
$bytesWritten = $this->writeMessageChunkToSocket(
$socket,
$messageId,
$messageChunk,
$messageChunkIndex,
$messageChunksCount
);
if (false === $bytesWritten) {
// Abort due to write error
return false;
}
}
} else {
// A single write is enough to get the message published
if (false === $this->writeMessageToSocket($socket, $preparedMessage)) {
// Abort due to write error
return false;
}
}
// This increases stability a lot if messages are sent in a loop
// A value of 20 means 0.02 ms
usleep(20);
// Message successful sent
return true;
}
/**
* @param Gelf_Message $message
*
* @return string
*/
protected function getPreparedMessage(Gelf_Message $message)
{
return gzcompress(json_encode($message->toArray()));
}
/**
* @return resource|false
*/
protected function getSocketConnection()
{
if (!$this->streamSocketClient) {
$hostname = gethostbyname($this->hostname);
$this->streamSocketClient = stream_socket_client(sprintf('udp://%s:%d', $hostname, $this->port));
if ($this->streamSocketClient === false && $this->fallbackPort) {
$this->streamSocketClient = stream_socket_client(sprintf('tcp://%s:%d', $hostname, $this->fallbackPort));
}
}
return $this->streamSocketClient;
}
/**
* @param string $preparedMessage
*
* @return boolean
*/
protected function isMessageSizeGreaterChunkSize($preparedMessage)
{
return (strlen($preparedMessage) > $this->chunkSize);
}
/**
* @return float
*/
protected function getMessageId()
{
return (float)(microtime(true).mt_rand(0, 10000));
}
/**
* @param string $preparedMessage
*
* @return array
*/
protected function getMessageChunks($preparedMessage)
{
return str_split($preparedMessage, $this->chunkSize);
}
/**
* @param float $messageId
* @param string $data
* @param integer $sequence
* @param integer $sequenceSize
*
* @throws InvalidArgumentException
* @return string
*/
protected function prependChunkInformation($messageId, $data, $sequence, $sequenceSize)
{
if (!is_string($data) || $data === '') {
throw new InvalidArgumentException('Data must be a string and not be empty.');
}
if (!is_integer($sequence) || !is_integer($sequenceSize)) {
throw new InvalidArgumentException('Sequence number and size must be integer.');
}
if ($sequenceSize <= 0) {
throw new InvalidArgumentException('Sequence size must be greater than 0.');
}
if ($sequence > $sequenceSize) {
throw new InvalidArgumentException('Sequence size must be greater than sequence number.');
}
return pack('CC', 30, 15).substr(md5($messageId, true), 0, 8).pack('CC', $sequence, $sequenceSize).$data;
}
/**
* @param resource $socket
* @param float $messageId
* @param string $messageChunk
* @param integer $messageChunkIndex
* @param integer $messageChunksCount
*
* @return integer|boolean
*/
protected function writeMessageChunkToSocket($socket, $messageId, $messageChunk, $messageChunkIndex, $messageChunksCount)
{
return fwrite(
$socket,
$this->prependChunkInformation($messageId, $messageChunk, $messageChunkIndex, $messageChunksCount)
);
}
/**
* @param resource $socket
* @param string $preparedMessage
*
* @return integer|boolean
*/
protected function writeMessageToSocket($socket, $preparedMessage)
{
return fwrite($socket, $preparedMessage);
}
}