ETL (Extract-Transform-Load) with Kiba(4)

2021-10-18 11:58:08 浏览数 (3)

其中最主要的就是 row[@to] = row.delete(@from)

它的意思就是删除 from 字段(或 Key) ,将其中的值赋予给 to 字段,这个字段是新字段,在 row hash 中添加入新的 KV 对

Tip: 删除 Hash 中的一个 Key 时会反馈其值

代码语言:javascript复制
2.3.0 :016 > row = {:a => "b", :c => "d"}
 => {:a=>"b", :c=>"d"} 
2.3.0 :017 > ap row
{
    :a => "b",
    :c => "d"
}
 => nil 
2.3.0 :018 > tmp = row.delete(:c)
 => "d" 
2.3.0 :019 > ap tmp
"d"
 => nil 
2.3.0 :020 > ap row
{
    :a => "b"
}
 => nil 
2.3.0 :021 >

最后运行的结果正如预期


数据有效性检查

为了防止源数据的格式变动或异常造成ETL任务的失败,我们可以对数据进行提前检查,以预防此类问题的发生

这里实现一个简单的空值检测,如果发现空值,就抛出定义的异常信息

这里需要加入一个新的 gem 到 Gemfile 中,并且进行安装

代码语言:javascript复制
[root@h102 kiba]# vim Gemfile
[root@h102 kiba]# cat Gemfile
source 'https://gems.ruby-china.org'

gem 'kiba', '~> 0.6.0'
gem 'awesome_print'
gem "facets", require: false
[root@h102 kiba]# bundle install 
Don't run Bundler as root. Bundler can ask for sudo if it is needed, and installing your bundle
as root will break this application for all non-root users on this machine.
Fetching gem metadata from https://gems.ruby-china.org/..
Fetching version metadata from https://gems.ruby-china.org/.
Resolving dependencies...
Using awesome_print 1.7.0
Installing facets 3.1.0
Using kiba 0.6.1
Using bundler 1.12.5
Bundle complete! 3 Gemfile dependencies, 4 gems now installed.
Use `bundle show [gemname]` to see where a bundled gem is installed.
[root@h102 kiba]# 

加入对列进行检查的类 VerifyFieldsPresence ,并定义处理逻辑

代码语言:javascript复制
[root@h102 kiba]# vim common.rb 
[root@h102 kiba]# cat common.rb 
require 'csv'

class CsvSource
  def initialize(file, options)
    @file = file
    @options = options
  end
  
  def each
    CSV.foreach(@file, @options) do |row|
      yield row.to_hash
    end
  end
end


require 'awesome_print'

def show_me
  transform do |row|
    ap row
    row # always return the row to keep it in the pipeline
  end
end


class ParseFrenchFloat
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    row[@to] = Float(row[@from].gsub(',', '.'))
    row
  end
end


class ParseFrenchDate
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    row[@to] = Date.strptime(row[@from], '%d/%m/%Y').to_s
    row
  end
end


class RenameField
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    row[@to] = row.delete(@from)
    row
  end
end


require 'facets/kernel/blank'

class VerifyFieldsPresence
  def initialize(expected_fields)
    @expected_fields = expected_fields
  end
  
  def process(row)
    @expected_fields.each do |field|
      if row[field].blank?
        raise "Row lacks value for field #{field} - #{row.inspect}"
      end
    end
    row
  end
end
[root@h102 kiba]# vim convert-csv.etl 
[root@h102 kiba]# cat convert-csv.etl 
require_relative 'common'

# read from source CSV file
source CsvSource, 'commandes.csv', col_sep: ';', headers: true, header_converters: :symbol

#verify the source columns are there and provide a non-blank value
transform VerifyFieldsPresence, [:date_facture, :montant_eur, :numero_commande]

# Parse the numbers
transform ParseFrenchFloat, from: :montant_eur, to: :amount_eur

#Reformat the dates
transform ParseFrenchDate, from: :date_facture, to: :invoice_date

#Rename the remaining column
transform RenameField, from: :numero_commande, to: :invoice_number

# show details of row contents
show_me
[root@h102 kiba]# bundle exec kiba convert-csv.etl
{
      :date_facture => "7/3/2015",
       :montant_eur => "10,96",
        :amount_eur => 10.96,
      :invoice_date => "2015-03-07",
    :invoice_number => "FA1986"
}
{
      :date_facture => "7/3/2015",
       :montant_eur => "85,11",
        :amount_eur => 85.11,
      :invoice_date => "2015-03-07",
    :invoice_number => "FA1987"
}
{
      :date_facture => "8/3/2015",
       :montant_eur => "6,41",
        :amount_eur => 6.41,
      :invoice_date => "2015-03-08",
    :invoice_number => "FA1988"
}
[root@h102 kiba]# 

0 人点赞