HEX
Server: LiteSpeed
System: Linux linux31.centraldnserver.com 4.18.0-553.83.1.lve.el8.x86_64 #1 SMP Wed Nov 12 10:04:12 UTC 2025 x86_64
User: salamatk (1501)
PHP: 8.1.33
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open
Upload Files
File: //proc/self/cwd/wp-content/plugins/woocommerce/src/Internal/Admin/Schedulers/OrdersScheduler.php
<?php
/**
 * Order syncing related functions and actions.
 */

namespace Automattic\WooCommerce\Internal\Admin\Schedulers;

defined( 'ABSPATH' ) || exit;

use Automattic\WooCommerce\Admin\API\Reports\Cache as ReportsCache;
use Automattic\WooCommerce\Admin\API\Reports\Coupons\DataStore as CouponsDataStore;
use Automattic\WooCommerce\Admin\API\Reports\Customers\DataStore as CustomersDataStore;
use Automattic\WooCommerce\Admin\API\Reports\Orders\DataStore as OrderDataStore;
use Automattic\WooCommerce\Admin\API\Reports\Orders\Stats\DataStore as OrdersStatsDataStore;
use Automattic\WooCommerce\Admin\API\Reports\Products\DataStore as ProductsDataStore;
use Automattic\WooCommerce\Admin\API\Reports\Taxes\DataStore as TaxesDataStore;
use Automattic\WooCommerce\Internal\DataStores\Orders\OrdersTableDataStore;
use Automattic\WooCommerce\Utilities\OrderUtil;

/**
 * OrdersScheduler Class.
 */
class OrdersScheduler extends ImportScheduler {
	/**
	 * Slug to identify the scheduler.
	 *
	 * @var string
	 */
	public static $name = 'orders';

	/**
	 * Option name for storing the last processed order modified date.
	 *
	 * This is used as a cursor to track progress through the orders table.
	 * We need both date and ID because multiple orders can have the same
	 * date_updated timestamp (e.g., bulk operations, imports). Without tracking
	 * the ID, we would endlessly reprocess orders at the same timestamp when
	 * the batch size is smaller than the number of orders at that timestamp.
	 *
	 * @var string
	 */
	const LAST_PROCESSED_ORDER_DATE_OPTION = 'woocommerce_admin_scheduler_last_processed_order_modified_date';

	/**
	 * Option name for storing the last processed order ID.
	 *
	 * Used in conjunction with LAST_PROCESSED_ORDER_DATE_OPTION to handle
	 * cases where multiple orders have the same date_updated timestamp.
	 * Query pattern: WHERE (date > last_date) OR (date = last_date AND id > last_id)
	 *
	 * @var string
	 */
	const LAST_PROCESSED_ORDER_ID_OPTION = 'woocommerce_admin_scheduler_last_processed_order_id';

	/**
	 * Option name for storing whether to enable immediate order import.
	 *
	 * @var string
	 */
	const IMMEDIATE_IMPORT_OPTION = 'woocommerce_analytics_immediate_import';

	/**
	 * Attach order lookup update hooks.
	 *
	 * @internal
	 */
	public static function init() {
		// Activate WC_Order extension.
		\Automattic\WooCommerce\Admin\Overrides\Order::add_filters();
		\Automattic\WooCommerce\Admin\Overrides\OrderRefund::add_filters();

		// Legacy behavior: Schedule import immediately on order create/update/delete.
		add_action( 'woocommerce_update_order', array( __CLASS__, 'possibly_schedule_import' ) );
		add_filter( 'woocommerce_create_order', array( __CLASS__, 'possibly_schedule_import' ) );
		add_action( 'woocommerce_refund_created', array( __CLASS__, 'possibly_schedule_import' ) );
		add_action( 'woocommerce_schedule_import', array( __CLASS__, 'possibly_schedule_import' ) );

		// Schedule recurring batch processor.
		add_action( 'init', array( __CLASS__, 'schedule_recurring_batch_processor' ) );

		OrdersStatsDataStore::init();
		CouponsDataStore::init();
		ProductsDataStore::init();
		TaxesDataStore::init();
		OrderDataStore::init();

		parent::init();
	}

	/**
	 * Add customer dependencies.
	 *
	 * @internal
	 * @return array
	 */
	public static function get_dependencies() {
		return array(
			'import_batch_init' => \Automattic\WooCommerce\Internal\Admin\Schedulers\CustomersScheduler::get_action( 'import_batch_init' ),
		);
	}

	/**
	 * Get all available scheduling actions.
	 * Extends parent to add the new batch processor action.
	 *
	 * @internal
	 * @return array
	 */
	public static function get_scheduler_actions() {
		return array_merge(
			parent::get_scheduler_actions(),
			array(
				'process_pending_batch' => 'wc-admin_process_pending_orders_batch',
			)
		);
	}

	/**
	 * Get batch sizes for OrdersScheduler actions.
	 *
	 * @internal
	 * @return array
	 */
	public static function get_batch_sizes() {
		return array_merge(
			parent::get_batch_sizes(),
			array(
				'process_pending_batch' => 100,
			)
		);
	}

	/**
	 * Get the order/refund IDs and total count that need to be synced.
	 *
	 * @internal
	 * @param int      $limit Number of records to retrieve.
	 * @param int      $page  Page number.
	 * @param int|bool $days Number of days prior to current date to limit search results.
	 * @param bool     $skip_existing Skip already imported orders.
	 */
	public static function get_items( $limit = 10, $page = 1, $days = false, $skip_existing = false ) {
		if ( OrderUtil::custom_orders_table_usage_is_enabled() ) {
			return self::get_items_from_orders_table( $limit, $page, $days, $skip_existing );
		} else {
			return self::get_items_from_posts_table( $limit, $page, $days, $skip_existing );
		}
	}

	/**
	 * Helper method to ger order/refund IDS and total count that needs to be synced.
	 *
	 * @internal
	 * @param int      $limit Number of records to retrieve.
	 * @param int      $page  Page number.
	 * @param int|bool $days Number of days prior to current date to limit search results.
	 * @param bool     $skip_existing Skip already imported orders.
	 *
	 * @return object Total counts.
	 */
	private static function get_items_from_posts_table( $limit, $page, $days, $skip_existing ) {
		global $wpdb;
		$where_clause = '';
		$offset       = $page > 1 ? ( $page - 1 ) * $limit : 0;

		if ( is_int( $days ) ) {
			$days_ago      = gmdate( 'Y-m-d 00:00:00', time() - ( DAY_IN_SECONDS * $days ) );
			$where_clause .= " AND post_date_gmt >= '{$days_ago}'";
		}

		if ( $skip_existing ) {
			$where_clause .= " AND NOT EXISTS (
				SELECT 1 FROM {$wpdb->prefix}wc_order_stats
				WHERE {$wpdb->prefix}wc_order_stats.order_id = {$wpdb->posts}.ID
			)";
		}

		$count = $wpdb->get_var(
			"SELECT COUNT(*) FROM {$wpdb->posts}
			WHERE post_type IN ( 'shop_order', 'shop_order_refund' )
			AND post_status NOT IN ( 'wc-auto-draft', 'auto-draft', 'trash' )
			{$where_clause}"
		); // phpcs:ignore unprepared SQL ok.

		$order_ids = absint( $count ) > 0 ? $wpdb->get_col(
			$wpdb->prepare(
				"SELECT ID FROM {$wpdb->posts}
				WHERE post_type IN ( 'shop_order', 'shop_order_refund' )
				AND post_status NOT IN ( 'wc-auto-draft', 'auto-draft', 'trash' )
				{$where_clause}
				ORDER BY post_date_gmt ASC
				LIMIT %d
				OFFSET %d",
				$limit,
				$offset
			)
		) : array(); // phpcs:ignore unprepared SQL ok.

		return (object) array(
			'total' => absint( $count ),
			'ids'   => $order_ids,
		);
	}

	/**
	 * Helper method to ger order/refund IDS and total count that needs to be synced from HPOS.
	 *
	 * @internal
	 * @param int      $limit Number of records to retrieve.
	 * @param int      $page  Page number.
	 * @param int|bool $days Number of days prior to current date to limit search results.
	 * @param bool     $skip_existing Skip already imported orders.
	 *
	 * @return object Total counts.
	 */
	private static function get_items_from_orders_table( $limit, $page, $days, $skip_existing ) {
		global $wpdb;
		$where_clause = '';
		$offset       = $page > 1 ? ( $page - 1 ) * $limit : 0;
		$order_table  = OrdersTableDataStore::get_orders_table_name();

		if ( is_int( $days ) ) {
			$days_ago      = gmdate( 'Y-m-d 00:00:00', time() - ( DAY_IN_SECONDS * $days ) );
			$where_clause .= " AND orders.date_created_gmt >= '{$days_ago}'";
		}

		if ( $skip_existing ) {
			$where_clause .= "AND NOT EXiSTS (
					SELECT 1 FROM {$wpdb->prefix}wc_order_stats
					WHERE {$wpdb->prefix}wc_order_stats.order_id = orders.id
					)
				";
		}

		$count = $wpdb->get_var(
			"
SELECT COUNT(*) FROM {$order_table} AS orders
WHERE type in ( 'shop_order', 'shop_order_refund' )
AND status NOT IN ( 'wc-auto-draft', 'trash', 'auto-draft' )
{$where_clause}
"
		); // phpcs:ignore unprepared SQL ok.

		$order_ids = absint( $count ) > 0 ? $wpdb->get_col(
			$wpdb->prepare(
				"SELECT id FROM {$order_table} AS orders
				WHERE type IN ( 'shop_order', 'shop_order_refund' )
				AND status NOT IN ( 'wc-auto-draft', 'auto-draft', 'trash' )
				{$where_clause}
				ORDER BY date_created_gmt ASC
				LIMIT %d
				OFFSET %d",
				$limit,
				$offset
			)
		) : array(); // phpcs:ignore unprepared SQL ok.

		return (object) array(
			'total' => absint( $count ),
			'ids'   => $order_ids,
		);
	}

	/**
	 * Get total number of rows imported.
	 *
	 * @internal
	 */
	public static function get_total_imported() {
		global $wpdb;
		return $wpdb->get_var( "SELECT COUNT(*) FROM {$wpdb->prefix}wc_order_stats" );
	}

	/**
	 * Schedule this import if the post is an order or refund.
	 * Note: This method is only called when immediate import is enabled
	 * via the 'woocommerce_analytics_enable_immediate_import' filter.
	 * Otherwise, orders are processed in batches periodically.
	 *
	 * @param int $order_id Post ID.
	 *
	 * @internal
	 * @returns int The order id
	 */
	public static function possibly_schedule_import( $order_id ) {
		if ( ! self::is_immediate_import_enabled() ) {
			return $order_id;
		}

		if ( ! OrderUtil::is_order( $order_id, array( 'shop_order' ) ) && 'woocommerce_refund_created' !== current_filter() && 'woocommerce_schedule_import' !== current_filter() ) {
			return $order_id;
		}

		self::schedule_action( 'import', array( $order_id ) );
		return $order_id;
	}

	/**
	 * Imports a single order or refund to update lookup tables for.
	 * If an error is encountered in one of the updates, a retry action is scheduled.
	 *
	 * @internal
	 * @param int $order_id Order or refund ID.
	 * @return void
	 */
	public static function import( $order_id ) {
		$order = wc_get_order( $order_id );

		// If the order isn't found for some reason, skip the sync.
		if ( ! $order ) {
			return;
		}

		$type = $order->get_type();

		// If the order isn't the right type, skip sync.
		if ( 'shop_order' !== $type && 'shop_order_refund' !== $type ) {
			return;
		}

		// If the order has no id or date created, skip sync.
		if ( ! $order->get_id() || ! $order->get_date_created() ) {
			return;
		}

		$results = array(
			OrdersStatsDataStore::sync_order( $order_id ),
			ProductsDataStore::sync_order_products( $order_id ),
			CouponsDataStore::sync_order_coupons( $order_id ),
			TaxesDataStore::sync_order_taxes( $order_id ),
			CustomersDataStore::sync_order_customer( $order_id ),
		);

		if ( 'shop_order' === $type ) {
			$order_refunds = $order->get_refunds();

			foreach ( $order_refunds as $refund ) {
				OrdersStatsDataStore::sync_order( $refund->get_id() );
			}
		}

		ReportsCache::invalidate();

		/**
		 * Fires after an order or refund has been imported into Analytics lookup tables
		 * and the reports cache has been invalidated.
		 *
		 * @since 10.3.0
		 * @param int $order_id Order or refund ID.
		 */
		do_action( 'woocommerce_order_scheduler_after_import_order', $order_id );
	}

	/**
	 * Schedule recurring batch processor for order imports.
	 *
	 * @internal
	 */
	public static function schedule_recurring_batch_processor() {
		if ( self::is_immediate_import_enabled() ) {
			// No need to schedule if immediate import is enabled.
			if ( self::has_existing_jobs( 'process_pending_batch', array() ) ) {
				$action_hook = self::get_action( 'process_pending_batch' );
				as_unschedule_all_actions( $action_hook, array(), static::$group );
			}
			return;
		}
		// Initialize last processed date if not set.
		self::initialize_last_processed_date();

		/**
		 * Filters the interval for the recurring batch processor.
		 *
		 * @since 9.5.0
		 * @param int $interval Interval in seconds. Default 12 hours.
		 */
		$interval = apply_filters( 'woocommerce_analytics_import_interval', 12 * HOUR_IN_SECONDS );

		$action_hook = self::get_action( 'process_pending_batch' );

		// Schedule recurring action if not already scheduled.
		if ( ! self::has_existing_jobs( 'process_pending_batch', array() ) ) {
			self::queue()->schedule_recurring( time(), $interval, $action_hook, array(), static::$group );
		}
	}

	/**
	 * Initialize the last processed date option if not set.
	 *
	 * @internal
	 */
	private static function initialize_last_processed_date() {
		if ( false !== get_option( self::LAST_PROCESSED_ORDER_DATE_OPTION, false ) ) {
			return; // Already initialized.
		}

		/**
		 * Add buffer to ensure orders created or updated during plugin activation, upgrade, or prior to import are accounted for.
		 * Buffer in seconds. 10 minutes.
		 */
		$buffer_seconds = 10 * MINUTE_IN_SECONDS;
		$start_date     = gmdate( 'Y-m-d H:i:s', time() - $buffer_seconds );

		update_option( self::LAST_PROCESSED_ORDER_DATE_OPTION, $start_date, false );
		update_option( self::LAST_PROCESSED_ORDER_ID_OPTION, 0, false );
	}

	/**
	 * Process pending orders in batch.
	 *
	 * This method queries for orders updated since the last cursor position
	 * (compound cursor: date + ID) and imports them into the analytics tables.
	 *
	 * @internal
	 * @param string|null $cursor_date Cursor date in 'Y-m-d H:i:s' format. Orders after this date will be processed.
	 * @param int|null    $cursor_id   Cursor order ID. Combined with $cursor_date to form compound cursor.
	 * @return void
	 */
	public static function process_pending_batch( $cursor_date = null, $cursor_id = null ) {
		$logger  = wc_get_logger();
		$context = array( 'source' => 'wc-analytics-order-import' );

		if ( self::is_importing() ) {
			// No need to process if an import is already in progress.
			$logger->info( 'Import is already in progress, skipping batch import.', $context );
			return;
		}

		// Load cursor position from options if not provided.
		$cursor_date = $cursor_date ?? get_option( self::LAST_PROCESSED_ORDER_DATE_OPTION );
		$cursor_id   = $cursor_id ?? (int) get_option( self::LAST_PROCESSED_ORDER_ID_OPTION, 0 );

		// Validate cursor date.
		if ( ! $cursor_date || ! strtotime( $cursor_date ) ) {
			$logger->error( 'Invalid cursor date: ' . $cursor_date, $context );
			$cursor_date = gmdate( 'Y-m-d H:i:s', 0 ); // Fallback to the earliest possible date.
		}

		$batch_size = self::get_batch_size( 'process_pending_batch' );

		$logger->info(
			sprintf( 'Starting batch import. Cursor: %s (ID: %d), batch size: %d', $cursor_date, $cursor_id, $batch_size ),
			$context
		);

		$start_time = microtime( true );

		// Get orders updated since the cursor position.
		$orders = self::get_orders_since( $cursor_date, $cursor_id, $batch_size );

		if ( empty( $orders ) ) {
			$logger->info( 'No orders to process', $context );
			return;
		}

		$processed_count = 0;
		foreach ( $orders as $order ) {
			try {
				self::import( $order->id );
				++$processed_count;

				// Advance cursor after each successful import. Since orders are sorted by
				// date ASC, id ASC, we can simply overwrite with the current order's values.
				// If an error occurs, we break and save the last successful position.
				$cursor_date = $order->date_updated_gmt;
				$cursor_id   = $order->id;
			} catch ( \Exception $e ) {
				$logger->error(
					sprintf( 'Failed to import order %d: %s', $order->id, $e->getMessage() ),
					$context
				);
				break;
			}
		}

		// Save the updated cursor position.
		update_option( self::LAST_PROCESSED_ORDER_DATE_OPTION, $cursor_date, false );
		update_option( self::LAST_PROCESSED_ORDER_ID_OPTION, $cursor_id, false );

		$elapsed_time = microtime( true ) - $start_time;
		$logger->info(
			sprintf(
				'Batch import completed. Processed: %d orders in %.2f seconds. Cursor: %s (ID: %d)',
				$processed_count,
				$elapsed_time,
				$cursor_date,
				$cursor_id
			),
			$context
		);

		// If we got a full batch, there might be more orders to process.
		// Schedule immediate next batch.
		if ( $processed_count === $batch_size ) {
			$logger->info( 'Full batch processed, scheduling next batch', $context );
			self::schedule_action(
				'process_pending_batch',
				array( $cursor_date, $cursor_id )
			);
		}
	}

	/**
	 * Get orders updated since the specified cursor position.
	 *
	 * Uses a compound cursor (date + ID) to handle cases where multiple orders
	 * have the same timestamp. This ensures we can paginate through orders reliably
	 * even when batch_size < number of orders at the same timestamp.
	 *
	 * @internal
	 * @param string $cursor_date Cursor date in 'Y-m-d H:i:s' format.
	 * @param int    $cursor_id   Cursor order ID.
	 * @param int    $limit       Number of orders to retrieve.
	 * @return array Array of objects with 'id' and 'date_updated_gmt' properties.
	 */
	private static function get_orders_since( $cursor_date, $cursor_id, $limit ) {
		if ( OrderUtil::custom_orders_table_usage_is_enabled() ) {
			return self::get_orders_since_from_orders_table( $cursor_date, $cursor_id, $limit );
		} else {
			return self::get_orders_since_from_posts_table( $cursor_date, $cursor_id, $limit );
		}
	}

	/**
	 * Get orders from HPOS orders table updated since the specified cursor position.
	 *
	 * Query logic uses a compound cursor (date, ID) to handle pagination when multiple
	 * orders share the same timestamp:
	 * - WHERE date > cursor_date: Get orders with newer timestamps
	 * - OR (date = cursor_date AND id > cursor_id): Continue processing same timestamp
	 *
	 * Example: With batch_size=100 and 1000 orders at '2024-01-01 10:00:00',
	 * this processes them across 10 batches without infinite loops or duplicates.
	 *
	 * @internal
	 * @param string $cursor_date Cursor date in 'Y-m-d H:i:s' format.
	 * @param int    $cursor_id   Cursor order ID.
	 * @param int    $limit       Number of orders to retrieve.
	 * @return array Array of objects with 'id' and 'date_updated_gmt' properties.
	 */
	private static function get_orders_since_from_orders_table( $cursor_date, $cursor_id, $limit ) {
		global $wpdb;
		$orders_table = OrdersTableDataStore::get_orders_table_name();

		// phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared
		return $wpdb->get_results(
			$wpdb->prepare(
				"SELECT id, date_updated_gmt
				FROM {$orders_table}
				WHERE type IN ('shop_order', 'shop_order_refund')
				AND status NOT IN ('wc-auto-draft', 'auto-draft', 'trash')
				AND (
					date_updated_gmt > %s
					OR (date_updated_gmt = %s AND id > %d)
				)
				ORDER BY date_updated_gmt ASC, id ASC
				LIMIT %d",
				$cursor_date,
				$cursor_date,
				$cursor_id,
				$limit
			)
		);
		// phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared
	}

	/**
	 * Get orders from posts table updated since the specified cursor position.
	 *
	 * Uses the same compound cursor logic as get_orders_since_from_orders_table()
	 * but queries the posts table instead of the HPOS orders table.
	 *
	 * @internal
	 * @param string $cursor_date Cursor date in 'Y-m-d H:i:s' format.
	 * @param int    $cursor_id   Cursor order ID.
	 * @param int    $limit       Number of orders to retrieve.
	 * @return array Array of objects with 'id' and 'date_updated_gmt' properties.
	 */
	private static function get_orders_since_from_posts_table( $cursor_date, $cursor_id, $limit ) {
		global $wpdb;

		return $wpdb->get_results(
			$wpdb->prepare(
				"SELECT ID as id, post_modified_gmt as date_updated_gmt
				FROM {$wpdb->posts}
				WHERE post_type IN ('shop_order', 'shop_order_refund')
				AND post_status NOT IN ('wc-auto-draft', 'auto-draft', 'trash')
				AND (
					post_modified_gmt > %s
					OR (post_modified_gmt = %s AND ID > %d)
				)
				ORDER BY post_modified_gmt ASC, ID ASC
				LIMIT %d",
				$cursor_date,
				$cursor_date,
				$cursor_id,
				$limit
			)
		);
	}

	/**
	 * Delete a batch of orders.
	 *
	 * @internal
	 * @param int $batch_size Number of items to delete.
	 * @return void
	 */
	public static function delete( $batch_size ) {
		global $wpdb;

		$order_ids = $wpdb->get_col(
			$wpdb->prepare(
				"SELECT order_id FROM {$wpdb->prefix}wc_order_stats ORDER BY order_id ASC LIMIT %d",
				$batch_size
			)
		);

		foreach ( $order_ids as $order_id ) {
			OrdersStatsDataStore::delete_order( $order_id );
		}
	}

	/**
	 * Check if immediate import is enabled.
	 *
	 * @internal
	 * @return bool
	 */
	private static function is_immediate_import_enabled(): bool {
		$enable_immediate_import = get_option( self::IMMEDIATE_IMPORT_OPTION, true );
		return filter_var( $enable_immediate_import, FILTER_VALIDATE_BOOLEAN );
	}
}