Refactor transaction enrichment to support batch processing #1803

Merged
Shpigford merged 7 commits from data-enricher-optimization into main 2025-02-06 00:34:28 +08:00
5 changed files with 62 additions and 54 deletions

View File

@@ -1,7 +0,0 @@
class EnrichDataJob < ApplicationJob
queue_as :latency_high
def perform(account)
account.enrich_data
end
end

View File

@@ -0,0 +1,8 @@
class EnrichTransactionBatchJob < ApplicationJob
queue_as :latency_high
def perform(account, batch_size = 100, offset = 0)
enricher = Account::DataEnricher.new(account)
enricher.enrich_transaction_batch(batch_size, offset)
end
end

View File

@@ -130,10 +130,6 @@ class Account < ApplicationRecord
DataEnricher.new(self).run
end
def enrich_data_later
EnrichDataJob.perform_later(self)
end
def update_with_sync!(attributes)
should_update_balance = attributes[:balance] && attributes[:balance].to_d != balance

View File

@@ -8,49 +8,61 @@ class Account::DataEnricher
end
zachgoll commented 2025-02-05 04:50:39 +08:00 (Migrated from github.com)
Review

Both merchant_id and category_id are stored on the Account::Transaction record, while enriched_at is stored on Account::Entry (delegated type).

Given we're primarily enriching Account::Transaction records (and not other entry types like Account::Valuation and Account::Trade), I think it probably makes sense to move the enriched_at field down to the Account::Transaction and only deal with those types of records in this process.

Both `merchant_id` and `category_id` are stored on the `Account::Transaction` record, while `enriched_at` is stored on `Account::Entry` (delegated type). Given we're primarily enriching `Account::Transaction` records (and not other entry types like `Account::Valuation` and `Account::Trade`), I think it probably makes sense to _move_ the `enriched_at` field down to the `Account::Transaction` and only deal with those types of records in this process.
zachgoll commented 2025-02-05 04:53:59 +08:00 (Migrated from github.com)
Review

Since this was originally written, we've added null validations to ensure name is present, so this can safely be removed now.

Since this was originally written, we've added null validations to ensure `name` is present, so this can safely be removed now.
zachgoll commented 2025-02-05 05:10:27 +08:00 (Migrated from github.com)
Review

Actually, second guessing the idea of moving enriched_at to Account::Transaction. This enrichment is also modifying enriched_name on the Account::Entry model, so it probably makes sense to keep this as-is.

Will still need to update this query to read merchant_id and category_id off the Account::Transaction record though.

Actually, second guessing the idea of moving `enriched_at` to `Account::Transaction`. This enrichment is also modifying `enriched_name` on the `Account::Entry` model, so it probably makes sense to keep this as-is. Will still need to update this query to read `merchant_id` and `category_id` off the `Account::Transaction` record though.
Shpigford commented 2025-02-05 09:31:52 +08:00 (Migrated from github.com)
Review

@zachgoll Is the entryable join necessary? AI seems to think so.

@zachgoll Is the entryable join necessary? AI seems to think so.
zachgoll commented 2025-02-05 22:07:46 +08:00 (Migrated from github.com)
Review

The join is necessary, but joins(:entryable) is not possible since this is a polymorphic association and will throw an error. Here's what I would use:

account.entries.joins("JOIN account_transactions at ON at.id = account_entries.entryable_id AND account_entries.entryable_type = 'Account::Transaction'").where("account_entries.enriched_at IS NULL OR at.category_id IS NULL
 OR at.merchant_id IS NULL")
The join is necessary, but `joins(:entryable)` is not possible since this is a polymorphic association and will throw an error. Here's what I would use: ```rb account.entries.joins("JOIN account_transactions at ON at.id = account_entries.entryable_id AND account_entries.entryable_type = 'Account::Transaction'").where("account_entries.enriched_at IS NULL OR at.category_id IS NULL OR at.merchant_id IS NULL") ```
def run
enrich_transactions
end
total_unenriched = account.entries.account_transactions
.joins("JOIN account_transactions at ON at.id = account_entries.entryable_id AND account_entries.entryable_type = 'Account::Transaction'")
.where("account_entries.enriched_at IS NULL OR at.merchant_id IS NULL OR at.category_id IS NULL")
.count
private
def enrich_transactions
candidates = account.entries.account_transactions.includes(entryable: [ :merchant, :category ])
if total_unenriched > 0
batch_size = 50
batches = (total_unenriched.to_f / batch_size).ceil
Rails.logger.info("Enriching #{candidates.count} transactions for account #{account.id}")
merchants = {}
candidates.each do |entry|
if entry.enriched_at.nil? || entry.entryable.merchant_id.nil? || entry.entryable.category_id.nil?
begin
next unless entry.name.present?
info = self.class.synth_provider.enrich_transaction(entry.name).info
next unless info.present?
if info.name.present?
merchant = merchants[info.name] ||= account.family.merchants.find_or_create_by(name: info.name)
if info.icon_url.present?
merchant.icon_url = info.icon_url
end
end
entryable_attributes = { id: entry.entryable_id }
entryable_attributes[:merchant_id] = merchant.id if merchant.present? && entry.entryable.merchant_id.nil?
Account.transaction do
merchant.save! if merchant.present?
entry.update!(
enriched_at: Time.current,
enriched_name: info.name,
entryable_attributes: entryable_attributes
)
end
rescue => e
Rails.logger.warn("Error enriching transaction #{entry.id}: #{e.message}")
end
end
batches.times do |batch|
EnrichTransactionBatchJob.perform_later(account, batch_size, batch * batch_size)
end
end
end
def enrich_transaction_batch(batch_size = 50, offset = 0)
candidates = account.entries.account_transactions
.includes(entryable: [ :merchant, :category ])
.joins("JOIN account_transactions at ON at.id = account_entries.entryable_id AND account_entries.entryable_type = 'Account::Transaction'")
.where("account_entries.enriched_at IS NULL OR at.merchant_id IS NULL OR at.category_id IS NULL")
.offset(offset)
.limit(batch_size)
Rails.logger.info("Enriching batch of #{candidates.count} transactions for account #{account.id} (offset: #{offset})")
merchants = {}
candidates.each do |entry|
begin
info = self.class.synth_provider.enrich_transaction(entry.name).info
next unless info.present?
if info.name.present?
merchant = merchants[info.name] ||= account.family.merchants.find_or_create_by(name: info.name)
if info.icon_url.present?
merchant.icon_url = info.icon_url
end
end
entryable_attributes = { id: entry.entryable_id }
entryable_attributes[:merchant_id] = merchant.id if merchant.present? && entry.entryable.merchant_id.nil?
Account.transaction do
merchant.save! if merchant.present?
entry.update!(
enriched_at: Time.current,
enriched_name: info.name,
entryable_attributes: entryable_attributes
)
end
rescue => e
Rails.logger.warn("Error enriching transaction #{entry.id}: #{e.message}")
end
end
end
end

View File

@@ -15,8 +15,7 @@ class Account::Syncer
# Enrich if user opted in or if we're syncing transactions from a Plaid account on the hosted app
if account.family.data_enrichment_enabled? || (account.plaid_account_id.present? && Rails.application.config.app_mode.hosted?)
# Temporarily disable until optimizations complete
# account.enrich_data_later
account.enrich_data
else
Rails.logger.info("Data enrichment is disabled, skipping enrichment for account #{account.id}")
end