Refactor transaction enrichment to support batch processing #1803
@@ -1,7 +0,0 @@
|
||||
class EnrichDataJob < ApplicationJob
|
||||
queue_as :latency_high
|
||||
|
||||
def perform(account)
|
||||
account.enrich_data
|
||||
end
|
||||
end
|
||||
8
app/jobs/enrich_transaction_batch_job.rb
Normal file
8
app/jobs/enrich_transaction_batch_job.rb
Normal file
@@ -0,0 +1,8 @@
|
||||
class EnrichTransactionBatchJob < ApplicationJob
|
||||
queue_as :latency_high
|
||||
|
||||
def perform(account, batch_size = 100, offset = 0)
|
||||
enricher = Account::DataEnricher.new(account)
|
||||
enricher.enrich_transaction_batch(batch_size, offset)
|
||||
end
|
||||
end
|
||||
@@ -130,10 +130,6 @@ class Account < ApplicationRecord
|
||||
DataEnricher.new(self).run
|
||||
end
|
||||
|
||||
def enrich_data_later
|
||||
EnrichDataJob.perform_later(self)
|
||||
end
|
||||
|
||||
def update_with_sync!(attributes)
|
||||
should_update_balance = attributes[:balance] && attributes[:balance].to_d != balance
|
||||
|
||||
|
||||
@@ -8,49 +8,61 @@ class Account::DataEnricher
|
||||
end
|
||||
|
|
||||
|
||||
def run
|
||||
enrich_transactions
|
||||
end
|
||||
total_unenriched = account.entries.account_transactions
|
||||
.joins("JOIN account_transactions at ON at.id = account_entries.entryable_id AND account_entries.entryable_type = 'Account::Transaction'")
|
||||
.where("account_entries.enriched_at IS NULL OR at.merchant_id IS NULL OR at.category_id IS NULL")
|
||||
.count
|
||||
|
||||
private
|
||||
def enrich_transactions
|
||||
candidates = account.entries.account_transactions.includes(entryable: [ :merchant, :category ])
|
||||
if total_unenriched > 0
|
||||
batch_size = 50
|
||||
batches = (total_unenriched.to_f / batch_size).ceil
|
||||
|
||||
Rails.logger.info("Enriching #{candidates.count} transactions for account #{account.id}")
|
||||
|
||||
merchants = {}
|
||||
|
||||
candidates.each do |entry|
|
||||
if entry.enriched_at.nil? || entry.entryable.merchant_id.nil? || entry.entryable.category_id.nil?
|
||||
begin
|
||||
next unless entry.name.present?
|
||||
|
||||
info = self.class.synth_provider.enrich_transaction(entry.name).info
|
||||
|
||||
next unless info.present?
|
||||
|
||||
if info.name.present?
|
||||
merchant = merchants[info.name] ||= account.family.merchants.find_or_create_by(name: info.name)
|
||||
|
||||
if info.icon_url.present?
|
||||
merchant.icon_url = info.icon_url
|
||||
end
|
||||
end
|
||||
|
||||
entryable_attributes = { id: entry.entryable_id }
|
||||
entryable_attributes[:merchant_id] = merchant.id if merchant.present? && entry.entryable.merchant_id.nil?
|
||||
|
||||
Account.transaction do
|
||||
merchant.save! if merchant.present?
|
||||
entry.update!(
|
||||
enriched_at: Time.current,
|
||||
enriched_name: info.name,
|
||||
entryable_attributes: entryable_attributes
|
||||
)
|
||||
end
|
||||
rescue => e
|
||||
Rails.logger.warn("Error enriching transaction #{entry.id}: #{e.message}")
|
||||
end
|
||||
end
|
||||
batches.times do |batch|
|
||||
EnrichTransactionBatchJob.perform_later(account, batch_size, batch * batch_size)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def enrich_transaction_batch(batch_size = 50, offset = 0)
|
||||
candidates = account.entries.account_transactions
|
||||
.includes(entryable: [ :merchant, :category ])
|
||||
.joins("JOIN account_transactions at ON at.id = account_entries.entryable_id AND account_entries.entryable_type = 'Account::Transaction'")
|
||||
.where("account_entries.enriched_at IS NULL OR at.merchant_id IS NULL OR at.category_id IS NULL")
|
||||
.offset(offset)
|
||||
.limit(batch_size)
|
||||
|
||||
Rails.logger.info("Enriching batch of #{candidates.count} transactions for account #{account.id} (offset: #{offset})")
|
||||
|
||||
merchants = {}
|
||||
|
||||
candidates.each do |entry|
|
||||
begin
|
||||
info = self.class.synth_provider.enrich_transaction(entry.name).info
|
||||
|
||||
next unless info.present?
|
||||
|
||||
if info.name.present?
|
||||
merchant = merchants[info.name] ||= account.family.merchants.find_or_create_by(name: info.name)
|
||||
|
||||
if info.icon_url.present?
|
||||
merchant.icon_url = info.icon_url
|
||||
end
|
||||
end
|
||||
|
||||
entryable_attributes = { id: entry.entryable_id }
|
||||
entryable_attributes[:merchant_id] = merchant.id if merchant.present? && entry.entryable.merchant_id.nil?
|
||||
|
||||
Account.transaction do
|
||||
merchant.save! if merchant.present?
|
||||
entry.update!(
|
||||
enriched_at: Time.current,
|
||||
enriched_name: info.name,
|
||||
entryable_attributes: entryable_attributes
|
||||
)
|
||||
end
|
||||
rescue => e
|
||||
Rails.logger.warn("Error enriching transaction #{entry.id}: #{e.message}")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -15,8 +15,7 @@ class Account::Syncer
|
||||
|
||||
# Enrich if user opted in or if we're syncing transactions from a Plaid account on the hosted app
|
||||
if account.family.data_enrichment_enabled? || (account.plaid_account_id.present? && Rails.application.config.app_mode.hosted?)
|
||||
# Temporarily disable until optimizations complete
|
||||
# account.enrich_data_later
|
||||
account.enrich_data
|
||||
else
|
||||
Rails.logger.info("Data enrichment is disabled, skipping enrichment for account #{account.id}")
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user
Both
merchant_idandcategory_idare stored on theAccount::Transactionrecord, whileenriched_atis stored onAccount::Entry(delegated type).Given we're primarily enriching
Account::Transactionrecords (and not other entry types likeAccount::ValuationandAccount::Trade), I think it probably makes sense to move theenriched_atfield down to theAccount::Transactionand only deal with those types of records in this process.Since this was originally written, we've added null validations to ensure
nameis present, so this can safely be removed now.Actually, second guessing the idea of moving
enriched_attoAccount::Transaction. This enrichment is also modifyingenriched_nameon theAccount::Entrymodel, so it probably makes sense to keep this as-is.Will still need to update this query to read
merchant_idandcategory_idoff theAccount::Transactionrecord though.@zachgoll Is the entryable join necessary? AI seems to think so.
The join is necessary, but
joins(:entryable)is not possible since this is a polymorphic association and will throw an error. Here's what I would use: