Bridge with doctrine/dbal

Refer to the official documentation on Doctrine’s website.

This bridge provides SQL database querying mechanisms, whenever you need to read/write from it.
Along with the ability to store your JobExecution in an SQL database.

Store JobExecution objects in an SQL database

Read from a paginated SQL query

The reader will yield each row fetched by an SQL query, one at a time, as an associative array.
The provided SQL query MUST include {limit} and {offset} placeholders to function correctly.
The limit and offset will be used to execute the query as many times as needed.
<?php

declare(strict_types=1);

use Doctrine\Persistence\ConnectionRegistry;
use Yokai\Batch\Bridge\Doctrine\DBAL\DoctrineDBALQueryOffsetReader;

/** @var ConnectionRegistry $connectionRegistry */

new DoctrineDBALQueryOffsetReader(
    doctrine: $connectionRegistry,
    sql: 'select * from user limit {limit} offset {offset};',
    connection: null, // will use default one, but you can pick any registered connection name
    batch: 500, // configure how many rows are fetched every time the query is executed
);

Warning

Due to the nature of SQL offset <https://hackernoon.com/dont-offset-your-sql-querys-performance>__, if the total result set is very large, consider using a cursor-based query instead.

Read from a cursored SQL query

The reader will yield each row fetched by an SQL query, one at a time, as an associative array.
The provided SQL query MUST include {after} and {limit} placeholders to function correctly.
The after and limit will be used to execute the query as many times as needed.
<?php

declare(strict_types=1);

use Doctrine\Persistence\ConnectionRegistry;
use Yokai\Batch\Bridge\Doctrine\DBAL\DoctrineDBALQueryCursorReader;

/** @var ConnectionRegistry $connectionRegistry */

new DoctrineDBALQueryCursorReader(
    doctrine: $connectionRegistry,
    sql: 'select * from user where id > {after} order by id asc limit {limit};',
    column: 'id', // the column that is used in both where and sort clauses
    start: 0, // the very first value to use, generally 0 or a very old date
    connection: null, // will use default one, but you can pick any registered connection name
    batch: 500, // configure how many rows are fetched every time the query is executed
);

Write items using inserts in a SQL table

The writer will insert each item into the same SQL table.
It expects that items are associative arrays.
<?php

declare(strict_types=1);

use Doctrine\DBAL\Types\Types;
use Doctrine\Persistence\ConnectionRegistry;
use Yokai\Batch\Bridge\Doctrine\DBAL\DoctrineDBALInsertWriter;

/** @var ConnectionRegistry $connectionRegistry */

// Column types are inferred from the table schema automatically
new DoctrineDBALInsertWriter(
    doctrine: $connectionRegistry,
    table: 'user',
    connection: null, // will use default one, but you can pick any registered connection name
);

// Or provide explicit type hints to override auto-detection
new DoctrineDBALInsertWriter(
    doctrine: $connectionRegistry,
    table: 'user',
    types: ['created_at' => Types::DATETIME_IMMUTABLE, 'active' => Types::BOOLEAN],
);

Write items using upsert in a SQL table

The writer will insert or update each item in an SQL table, with the item determining the behavior.
It expects that items are DoctrineDBALUpsert objects.
<?php

declare(strict_types=1);

use Doctrine\Persistence\ConnectionRegistry;
use Yokai\Batch\Bridge\Doctrine\DBAL\DoctrineDBALUpsertWriter;

/** @var ConnectionRegistry $connectionRegistry */

new DoctrineDBALUpsertWriter(
    doctrine: $connectionRegistry,
    connection: null, // will use default one, but you can pick any registered connection name
);
Typically, this writer requires an ItemProcessorInterface to transform items into DoctrineDBALUpsert objects.
These objects should contain:
  • The table where the record should be inserted or updated

  • All the data to insert or update

  • A field and value that will be used to identify the record to update (when applicable)

<?php

declare(strict_types=1);

use Doctrine\Persistence\ConnectionRegistry;
use Yokai\Batch\Bridge\Doctrine\DBAL\DoctrineDBALUpsert;
use Yokai\Batch\Bridge\Doctrine\DBAL\DoctrineDBALUpsertWriter;
use Yokai\Batch\Job\Item\ItemJob;
use Yokai\Batch\Job\Item\ItemReaderInterface;
use Yokai\Batch\Job\Item\Processor\CallbackProcessor;
use Yokai\Batch\Storage\JobExecutionStorageInterface;

/** @var ConnectionRegistry $connectionRegistry */
/** @var ItemReaderInterface $reader */
/** @var JobExecutionStorageInterface $executionStorage */

new ItemJob(
    batchSize: 500,
    reader: $reader,
    processor: new CallbackProcessor(fn (array $data) => new DoctrineDBALUpsert(
        table: 'user',
        data: $data,
        identity: isset($data['id']) ? ['id' => $data['id']] : [],
    )),
    writer: new DoctrineDBALUpsertWriter($connectionRegistry),
    executionStorage: $executionStorage,
);