Subida del módulo y tema de PrestaShop

This commit is contained in:
Kaloyan
2026-04-09 18:31:51 +02:00
parent 12c253296f
commit 16b3ff9424
39262 changed files with 7418797 additions and 0 deletions

View File

@@ -0,0 +1,19 @@
Copyright (c) 2018-present Fabien Potencier
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@@ -0,0 +1,633 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\Driver\Exception as DriverException;
use Doctrine\DBAL\Driver\ResultStatement;
use Doctrine\DBAL\Exception as DBALException;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\LockMode;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\OraclePlatform;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractAsset;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\ComparatorConfig;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaDiff;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Types;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\Service\ResetInterface;
/**
* @internal
*
* @author Vincent Touzet <vincent.touzet@gmail.com>
* @author Kévin Dunglas <dunglas@gmail.com>
*/
class Connection implements ResetInterface
{
protected const TABLE_OPTION_NAME = '_symfony_messenger_table_name';
protected const DEFAULT_OPTIONS = [
'table_name' => 'messenger_messages',
'queue_name' => 'default',
'redeliver_timeout' => 3600,
'auto_setup' => true,
];
private const ORACLE_SEQUENCES_SUFFIX = '_seq';
/**
* Configuration of the connection.
*
* Available options:
*
* * table_name: name of the table
* * connection: name of the Doctrine's entity manager
* * queue_name: name of the queue
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
*/
protected array $configuration;
protected DBALConnection $driverConnection;
protected ?float $queueEmptiedAt = null;
private ?SchemaSynchronizer $schemaSynchronizer;
private bool $autoSetup;
public function __construct(array $configuration, DBALConnection $driverConnection, ?SchemaSynchronizer $schemaSynchronizer = null)
{
$this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
$this->driverConnection = $driverConnection;
$this->schemaSynchronizer = $schemaSynchronizer;
$this->autoSetup = $this->configuration['auto_setup'];
}
public function reset(): void
{
$this->queueEmptiedAt = null;
}
public function getConfiguration(): array
{
return $this->configuration;
}
public static function buildConfiguration(#[\SensitiveParameter] string $dsn, array $options = []): array
{
if (false === $params = parse_url($dsn)) {
throw new InvalidArgumentException('The given Doctrine Messenger DSN is invalid.');
}
$query = [];
if (isset($params['query'])) {
parse_str($params['query'], $query);
}
$configuration = ['connection' => $params['host']];
$configuration += $query + $options + static::DEFAULT_OPTIONS;
$configuration['auto_setup'] = filter_var($configuration['auto_setup'], \FILTER_VALIDATE_BOOL);
// check for extra keys in options
$optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
if (0 < \count($optionsExtraKeys)) {
throw new InvalidArgumentException(\sprintf('Unknown option found: [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
}
// check for extra keys in options
$queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
if (0 < \count($queryExtraKeys)) {
throw new InvalidArgumentException(\sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
}
return $configuration;
}
/**
* @param int $delay The delay in milliseconds
*
* @return string The inserted id
*
* @throws DBALException
*/
public function send(string $body, array $headers, int $delay = 0): string
{
$now = new \DateTimeImmutable('UTC');
$availableAt = $now->modify(\sprintf('%+d seconds', $delay / 1000));
$queryBuilder = $this->driverConnection->createQueryBuilder()
->insert($this->configuration['table_name'])
->values([
'body' => '?',
'headers' => '?',
'queue_name' => '?',
'created_at' => '?',
'available_at' => '?',
]);
return $this->executeInsert($queryBuilder->getSQL(), [
$body,
json_encode($headers),
$this->configuration['queue_name'],
$now,
$availableAt,
], [
Types::STRING,
Types::STRING,
Types::STRING,
Types::DATETIME_IMMUTABLE,
Types::DATETIME_IMMUTABLE,
]);
}
public function get(): ?array
{
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
try {
$this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59']);
} catch (DriverException $e) {
// Ignore the exception
} catch (TableNotFoundException $e) {
if ($this->autoSetup) {
$this->setup();
}
}
}
get:
$this->driverConnection->beginTransaction();
try {
$query = $this->createAvailableMessagesQueryBuilder()
->orderBy('available_at', 'ASC')
->setMaxResults(1);
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$query->select('m.id');
}
// Append pessimistic write lock to FROM clause if db platform supports it
$sql = $query->getSQL();
// Wrap the rownum query in a sub-query to allow writelocks without ORA-02014 error
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$query = $this->createQueryBuilder('w')
->where('w.id IN ('.str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql).')')
->setParameters($query->getParameters(), $query->getParameterTypes());
if (method_exists(QueryBuilder::class, 'forUpdate')) {
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
}
$sql = $query->getSQL();
} elseif (method_exists(QueryBuilder::class, 'forUpdate')) {
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
try {
$sql = $query->getSQL();
} catch (DBALException $e) {
// If SKIP_LOCKED is not supported, fallback to without SKIP_LOCKED
$query->forUpdate();
try {
$sql = $query->getSQL();
} catch (DBALException $e) {
}
}
} elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
$fromClause = $matches[1];
$sql = str_replace(
\sprintf('FROM %s WHERE', $fromClause),
\sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)),
$sql
);
}
// use SELECT ... FOR UPDATE to lock table
if (!method_exists(QueryBuilder::class, 'forUpdate')) {
$sql .= ' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL();
}
$stmt = $this->executeQuery(
$sql,
$query->getParameters(),
$query->getParameterTypes()
);
$doctrineEnvelope = $stmt instanceof Result ? $stmt->fetchAssociative() : $stmt->fetch();
if (false === $doctrineEnvelope) {
$this->driverConnection->commit();
$this->queueEmptiedAt = microtime(true) * 1000;
return null;
}
// Postgres can "group" notifications having the same channel and payload
// We need to be sure to empty the queue before blocking again
$this->queueEmptiedAt = null;
$doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
$queryBuilder = $this->driverConnection->createQueryBuilder()
->update($this->configuration['table_name'])
->set('delivered_at', '?')
->where('id = ?');
$now = new \DateTimeImmutable('UTC');
$this->executeStatement($queryBuilder->getSQL(), [
$now,
$doctrineEnvelope['id'],
], [
Types::DATETIME_IMMUTABLE,
]);
$this->driverConnection->commit();
return $doctrineEnvelope;
} catch (\Throwable $e) {
$this->driverConnection->rollBack();
if ($this->autoSetup && $e instanceof TableNotFoundException) {
$this->setup();
goto get;
}
throw $e;
}
}
public function ack(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
}
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function reject(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
}
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function setup(): void
{
$configuration = $this->driverConnection->getConfiguration();
$assetFilter = $configuration->getSchemaAssetsFilter();
$configuration->setSchemaAssetsFilter(function ($tableName) {
if ($tableName instanceof AbstractAsset) {
$tableName = $tableName->getName();
}
if (!\is_string($tableName)) {
throw new \TypeError(\sprintf('The table name must be an instance of "%s" or a string ("%s" given).', AbstractAsset::class, get_debug_type($tableName)));
}
return $tableName === $this->configuration['table_name'];
});
$this->updateSchema();
$configuration->setSchemaAssetsFilter($assetFilter);
$this->autoSetup = false;
}
public function getMessageCount(): int
{
$queryBuilder = $this->createAvailableMessagesQueryBuilder()
->select('COUNT(m.id) AS message_count')
->setMaxResults(1);
$stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
return $stmt instanceof Result ? $stmt->fetchOne() : $stmt->fetchColumn();
}
public function findAll(?int $limit = null): array
{
$queryBuilder = $this->createAvailableMessagesQueryBuilder();
if (null !== $limit) {
$queryBuilder->setMaxResults($limit);
}
$stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
$data = $stmt instanceof Result ? $stmt->fetchAllAssociative() : $stmt->fetchAll();
return array_map(fn ($doctrineEnvelope) => $this->decodeEnvelopeHeaders($doctrineEnvelope), $data);
}
public function find(mixed $id): ?array
{
$queryBuilder = $this->createQueryBuilder()
->where('m.id = ? and m.queue_name = ?');
$stmt = $this->executeQuery($queryBuilder->getSQL(), [$id, $this->configuration['queue_name']]);
$data = $stmt instanceof Result ? $stmt->fetchAssociative() : $stmt->fetch();
return false === $data ? null : $this->decodeEnvelopeHeaders($data);
}
/**
* @internal
*/
public function configureSchema(Schema $schema, DBALConnection $forConnection, \Closure $isSameDatabase): void
{
if ($schema->hasTable($this->configuration['table_name'])) {
return;
}
if ($forConnection !== $this->driverConnection && !$isSameDatabase($this->executeStatement(...))) {
return;
}
$this->addTableToSchema($schema);
}
/**
* @internal
*/
public function getExtraSetupSqlForTable(Table $createdTable): array
{
return [];
}
private function createAvailableMessagesQueryBuilder(): QueryBuilder
{
$now = new \DateTimeImmutable('UTC');
$redeliverLimit = $now->modify(\sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
return $this->createQueryBuilder()
->where('m.queue_name = ?')
->andWhere('m.delivered_at is null OR m.delivered_at < ?')
->andWhere('m.available_at <= ?')
->setParameters([
$this->configuration['queue_name'],
$redeliverLimit,
$now,
], [
Types::STRING,
Types::DATETIME_IMMUTABLE,
Types::DATETIME_IMMUTABLE,
]);
}
private function createQueryBuilder(string $alias = 'm'): QueryBuilder
{
$queryBuilder = $this->driverConnection->createQueryBuilder()
->from($this->configuration['table_name'], $alias);
$alias .= '.';
if (!$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
return $queryBuilder->select($alias.'*');
}
// Oracle databases use UPPER CASE on tables and column identifiers.
// Column alias is added to force the result to be lowercase even when the actual field is all caps.
return $queryBuilder->select(str_replace(', ', ', '.$alias,
$alias.'id AS "id", body AS "body", headers AS "headers", queue_name AS "queue_name", '.
'created_at AS "created_at", available_at AS "available_at", '.
'delivered_at AS "delivered_at"'
));
}
private function executeQuery(string $sql, array $parameters = [], array $types = []): Result|AbstractionResult|ResultStatement
{
try {
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
} catch (TableNotFoundException $e) {
if (!$this->autoSetup || $this->driverConnection->isTransactionActive()) {
throw $e;
}
$this->setup();
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
}
return $stmt;
}
protected function executeStatement(string $sql, array $parameters = [], array $types = []): int|string
{
try {
$stmt = $this->driverConnection->executeStatement($sql, $parameters, $types);
} catch (TableNotFoundException $e) {
if (!$this->autoSetup || $this->driverConnection->isTransactionActive()) {
throw $e;
}
$this->setup();
$stmt = $this->driverConnection->executeStatement($sql, $parameters, $types);
}
return $stmt;
}
private function executeInsert(string $sql, array $parameters = [], array $types = []): string
{
// Use PostgreSQL RETURNING clause instead of lastInsertId() to get the
// inserted id in one operation instead of two.
if ($this->driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform) {
$sql .= ' RETURNING id';
}
insert:
$this->driverConnection->beginTransaction();
try {
if ($this->driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform) {
$first = $this->driverConnection->fetchFirstColumn($sql, $parameters, $types);
$id = $first[0] ?? null;
if (!$id) {
throw new TransportException('no id was returned by PostgreSQL from RETURNING clause.');
}
} elseif ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$sequenceName = $this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX;
$this->driverConnection->executeStatement($sql, $parameters, $types);
$result = $this->driverConnection->fetchOne('SELECT '.$sequenceName.'.CURRVAL FROM DUAL');
$id = (int) $result;
if (!$id) {
throw new TransportException('no id was returned by Oracle from sequence: '.$sequenceName);
}
} else {
$this->driverConnection->executeStatement($sql, $parameters, $types);
if (!$id = $this->driverConnection->lastInsertId()) {
throw new TransportException('lastInsertId() returned false, no id was returned.');
}
}
$this->driverConnection->commit();
} catch (\Throwable $e) {
$this->driverConnection->rollBack();
// handle setup after transaction is no longer open
if ($this->autoSetup && $e instanceof TableNotFoundException) {
$this->setup();
goto insert;
}
throw $e;
}
return $id;
}
private function getSchema(): Schema
{
$schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig());
$this->addTableToSchema($schema);
return $schema;
}
private function addTableToSchema(Schema $schema): void
{
$table = $schema->createTable($this->configuration['table_name']);
// add an internal option to mark that we created this & the non-namespaced table name
$table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']);
$idColumn = $table->addColumn('id', Types::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
$table->addColumn('body', Types::TEXT)
->setNotnull(true);
$table->addColumn('headers', Types::TEXT)
->setNotnull(true);
$table->addColumn('queue_name', Types::STRING)
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
->setNotnull(true);
$table->addColumn('created_at', Types::DATETIME_IMMUTABLE)
->setNotnull(true);
$table->addColumn('available_at', Types::DATETIME_IMMUTABLE)
->setNotnull(true);
$table->addColumn('delivered_at', Types::DATETIME_IMMUTABLE)
->setNotnull(false);
$table->setPrimaryKey(['id']);
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
$table->addIndex(['delivered_at']);
// We need to create a sequence for Oracle and set the id column to get the correct nextval
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$idColumn->setDefault($this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX.'.nextval');
$schema->createSequence($this->configuration['table_name'].self::ORACLE_SEQUENCES_SUFFIX);
}
}
private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
{
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
return $doctrineEnvelope;
}
private function updateSchema(): void
{
if (null !== $this->schemaSynchronizer) {
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
return;
}
$schemaManager = $this->createSchemaManager();
$comparator = $this->createComparator($schemaManager);
$schemaDiff = $this->compareSchemas($comparator, method_exists($schemaManager, 'introspectSchema') ? $schemaManager->introspectSchema() : $schemaManager->createSchema(), $this->getSchema());
$platform = $this->driverConnection->getDatabasePlatform();
if (!method_exists(SchemaDiff::class, 'getCreatedSchemas')) {
foreach ($schemaDiff->toSaveSql($platform) as $sql) {
$this->driverConnection->executeStatement($sql);
}
return;
}
if ($platform->supportsSchemas()) {
foreach ($schemaDiff->getCreatedSchemas() as $schema) {
$this->driverConnection->executeStatement($platform->getCreateSchemaSQL($schema));
}
}
if ($platform->supportsSequences()) {
foreach ($schemaDiff->getAlteredSequences() as $sequence) {
$this->driverConnection->executeStatement($platform->getAlterSequenceSQL($sequence));
}
foreach ($schemaDiff->getCreatedSequences() as $sequence) {
$this->driverConnection->executeStatement($platform->getCreateSequenceSQL($sequence));
}
}
foreach ($platform->getCreateTablesSQL($schemaDiff->getCreatedTables()) as $sql) {
$this->driverConnection->executeStatement($sql);
}
foreach ($schemaDiff->getAlteredTables() as $tableDiff) {
foreach ($platform->getAlterTableSQL($tableDiff) as $sql) {
$this->driverConnection->executeStatement($sql);
}
}
}
private function createSchemaManager(): AbstractSchemaManager
{
return method_exists($this->driverConnection, 'createSchemaManager')
? $this->driverConnection->createSchemaManager()
: $this->driverConnection->getSchemaManager();
}
private function createComparator(AbstractSchemaManager $schemaManager): Comparator
{
if (class_exists(ComparatorConfig::class)) {
return $schemaManager->createComparator((new ComparatorConfig())->withReportModifiedIndexes(false));
}
return method_exists($schemaManager, 'createComparator')
? $schemaManager->createComparator()
: new Comparator();
}
private function compareSchemas(Comparator $comparator, Schema $from, Schema $to): SchemaDiff
{
return method_exists($comparator, 'compareSchemas') || method_exists($comparator, 'doCompareSchemas')
? $comparator->compareSchemas($from, $to)
: $comparator->compare($from, $to);
}
}

View File

@@ -0,0 +1,32 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineReceivedStamp implements NonSendableStampInterface
{
private string $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}

View File

@@ -0,0 +1,179 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Exception as DBALException;
use Doctrine\DBAL\Exception\RetryableException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface
{
private const MAX_RETRIES = 3;
private int $retryingSafetyCounter = 0;
private Connection $connection;
private SerializerInterface $serializer;
public function __construct(Connection $connection, ?SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
{
try {
$doctrineEnvelope = $this->connection->get();
$this->retryingSafetyCounter = 0; // reset counter
} catch (RetryableException $exception) {
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
// as it will likely be resolved on the next call to get()
// Problem with concurrent consumers and database deadlocks
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
$this->retryingSafetyCounter = 0; // reset counter
throw new TransportException($exception->getMessage(), 0, $exception);
}
return [];
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $doctrineEnvelope) {
return [];
}
return [$this->createEnvelopeFromData($doctrineEnvelope)];
}
public function ack(Envelope $envelope): void
{
$this->withRetryableExceptionRetry(function () use ($envelope) {
$this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
});
}
public function reject(Envelope $envelope): void
{
$this->withRetryableExceptionRetry(function () use ($envelope) {
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
});
}
public function getMessageCount(): int
{
try {
return $this->connection->getMessageCount();
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function all(?int $limit = null): iterable
{
try {
$doctrineEnvelopes = $this->connection->findAll($limit);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
foreach ($doctrineEnvelopes as $doctrineEnvelope) {
yield $this->createEnvelopeFromData($doctrineEnvelope);
}
}
public function find(mixed $id): ?Envelope
{
try {
$doctrineEnvelope = $this->connection->find($id);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $doctrineEnvelope) {
return null;
}
return $this->createEnvelopeFromData($doctrineEnvelope);
}
private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
{
/** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */
$doctrineReceivedStamp = $envelope->last(DoctrineReceivedStamp::class);
if (null === $doctrineReceivedStamp) {
throw new LogicException('No DoctrineReceivedStamp found on the Envelope.');
}
return $doctrineReceivedStamp;
}
private function createEnvelopeFromData(array $data): Envelope
{
try {
$envelope = $this->serializer->decode([
'body' => $data['body'],
'headers' => $data['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($data['id']);
throw $exception;
}
return $envelope
->withoutAll(TransportMessageIdStamp::class)
->with(
new DoctrineReceivedStamp($data['id']),
new TransportMessageIdStamp($data['id'])
);
}
private function withRetryableExceptionRetry(callable $callable): void
{
$delay = 100;
$multiplier = 2;
$jitter = 0.1;
$retries = 0;
retry:
try {
$callable();
} catch (RetryableException $exception) {
if (++$retries <= self::MAX_RETRIES) {
$delay *= $multiplier;
$randomness = (int) ($delay * $jitter);
$delay += random_int(-$randomness, +$randomness);
usleep($delay * 1000);
goto retry;
}
throw new TransportException($exception->getMessage(), 0, $exception);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
}

View File

@@ -0,0 +1,53 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Exception as DBALException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineSender implements SenderInterface
{
private Connection $connection;
private SerializerInterface $serializer;
public function __construct(Connection $connection, ?SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
try {
$id = $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
return $envelope->with(new TransportMessageIdStamp($id));
}
}

View File

@@ -0,0 +1,111 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Connection as DbalConnection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private Connection $connection;
private SerializerInterface $serializer;
private DoctrineReceiver $receiver;
private DoctrineSender $sender;
public function __construct(Connection $connection, SerializerInterface $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer;
}
public function get(): iterable
{
return $this->getReceiver()->get();
}
public function ack(Envelope $envelope): void
{
$this->getReceiver()->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->getReceiver()->reject($envelope);
}
public function getMessageCount(): int
{
return $this->getReceiver()->getMessageCount();
}
public function all(?int $limit = null): iterable
{
return $this->getReceiver()->all($limit);
}
public function find(mixed $id): ?Envelope
{
return $this->getReceiver()->find($id);
}
public function send(Envelope $envelope): Envelope
{
return $this->getSender()->send($envelope);
}
public function setup(): void
{
$this->connection->setup();
}
/**
* Adds the Table to the Schema if this transport uses this connection.
*
* @param \Closure $isSameDatabase
*/
public function configureSchema(Schema $schema, DbalConnection $forConnection/* , \Closure $isSameDatabase */): void
{
$isSameDatabase = 2 < \func_num_args() ? func_get_arg(2) : static fn () => false;
$this->connection->configureSchema($schema, $forConnection, $isSameDatabase);
}
/**
* Adds extra SQL if the given table was created by the Connection.
*
* @return string[]
*/
public function getExtraSetupSqlForTable(Table $createdTable): array
{
return $this->connection->getExtraSetupSqlForTable($createdTable);
}
private function getReceiver(): DoctrineReceiver
{
return $this->receiver ??= new DoctrineReceiver($this->connection, $this->serializer);
}
private function getSender(): DoctrineSender
{
return $this->sender ??= new DoctrineSender($this->connection, $this->serializer);
}
}

View File

@@ -0,0 +1,64 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\Persistence\ConnectionRegistry;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*
* @implements TransportFactoryInterface<DoctrineTransport>
*/
class DoctrineTransportFactory implements TransportFactoryInterface
{
private ConnectionRegistry $registry;
public function __construct(ConnectionRegistry $registry)
{
$this->registry = $registry;
}
/**
* @param array $options You can set 'use_notify' to false to not use LISTEN/NOTIFY with postgresql
*/
public function createTransport(#[\SensitiveParameter] string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
$useNotify = ($options['use_notify'] ?? true);
unset($options['transport_name'], $options['use_notify']);
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
try {
$driverConnection = $this->registry->getConnection($configuration['connection']);
} catch (\InvalidArgumentException $e) {
throw new TransportException('Could not find Doctrine connection from Messenger DSN.', 0, $e);
}
if ($useNotify && $driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform) {
$connection = new PostgreSqlConnection($configuration, $driverConnection);
} else {
$connection = new Connection($configuration, $driverConnection);
}
return new DoctrineTransport($connection, $serializer);
}
public function supports(#[\SensitiveParameter] string $dsn, array $options): bool
{
return str_starts_with($dsn, 'doctrine://');
}
}

View File

@@ -0,0 +1,154 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Schema\Table;
/**
* Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
*
* If you do not want to use the LISTEN mechanism, set the `use_notify` option to `false` when calling DoctrineTransportFactory::createTransport.
*
* @internal
*
* @author Kévin Dunglas <dunglas@gmail.com>
*/
final class PostgreSqlConnection extends Connection
{
/**
* * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 60000 (1 minute)
* * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0.
*/
protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [
'check_delayed_interval' => 60000,
'get_notify_timeout' => 0,
];
public function __sleep(): array
{
throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
}
/**
* @return void
*/
public function __wakeup()
{
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
}
public function __destruct()
{
$this->unlisten();
}
public function reset(): void
{
parent::reset();
$this->unlisten();
}
public function get(): ?array
{
if (null === $this->queueEmptiedAt) {
return parent::get();
}
// This is secure because the table name must be a valid identifier:
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
$this->executeStatement(\sprintf('LISTEN "%s"', $this->configuration['table_name']));
// The condition should be removed once support for DBAL <3.3 is dropped
if (method_exists($this->driverConnection, 'getNativeConnection')) {
$wrappedConnection = $this->driverConnection->getNativeConnection();
} else {
$wrappedConnection = $this->driverConnection;
while (method_exists($wrappedConnection, 'getWrappedConnection')) {
$wrappedConnection = $wrappedConnection->getWrappedConnection();
}
}
$notification = $wrappedConnection->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
if (
// no notifications, or for another table or queue
(false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name'])
// delayed messages
&& (microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
) {
usleep(1000);
return null;
}
return parent::get();
}
public function setup(): void
{
parent::setup();
$this->executeStatement(implode("\n", $this->getTriggerSql()));
}
/**
* @return string[]
*/
public function getExtraSetupSqlForTable(Table $createdTable): array
{
if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) {
return [];
}
if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) {
return [];
}
return $this->getTriggerSql();
}
private function getTriggerSql(): array
{
$functionName = $this->createTriggerFunctionName();
return [
// create trigger function
\sprintf(<<<'SQL'
CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('%2$s', NEW.queue_name::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
SQL
, $functionName, $this->configuration['table_name']),
// register trigger
\sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->configuration['table_name']),
\sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->configuration['table_name'], $functionName),
];
}
private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->configuration['table_name']);
if (1 === \count($tableConfig)) {
return \sprintf('notify_%1$s', $tableConfig[0]);
}
return \sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]);
}
private function unlisten(): void
{
$this->executeStatement(\sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
}
}