其中最主要的就是 row[@to] = row.delete(@from)
它的意思就是删除 from 字段(或 Key) ,将其中的值赋予给 to 字段,这个字段是新字段,在 row hash 中添加入新的 KV 对
代码语言:javascript复制Tip: 删除 Hash 中的一个 Key 时会反馈其值
2.3.0 :016 > row = {:a => "b", :c => "d"}
=> {:a=>"b", :c=>"d"}
2.3.0 :017 > ap row
{
:a => "b",
:c => "d"
}
=> nil
2.3.0 :018 > tmp = row.delete(:c)
=> "d"
2.3.0 :019 > ap tmp
"d"
=> nil
2.3.0 :020 > ap row
{
:a => "b"
}
=> nil
2.3.0 :021 >
最后运行的结果正如预期
数据有效性检查
为了防止源数据的格式变动或异常造成ETL任务的失败,我们可以对数据进行提前检查,以预防此类问题的发生
这里实现一个简单的空值检测,如果发现空值,就抛出定义的异常信息
这里需要加入一个新的 gem 到 Gemfile 中,并且进行安装
代码语言:javascript复制[root@h102 kiba]# vim Gemfile
[root@h102 kiba]# cat Gemfile
source 'https://gems.ruby-china.org'
gem 'kiba', '~> 0.6.0'
gem 'awesome_print'
gem "facets", require: false
[root@h102 kiba]# bundle install
Don't run Bundler as root. Bundler can ask for sudo if it is needed, and installing your bundle
as root will break this application for all non-root users on this machine.
Fetching gem metadata from https://gems.ruby-china.org/..
Fetching version metadata from https://gems.ruby-china.org/.
Resolving dependencies...
Using awesome_print 1.7.0
Installing facets 3.1.0
Using kiba 0.6.1
Using bundler 1.12.5
Bundle complete! 3 Gemfile dependencies, 4 gems now installed.
Use `bundle show [gemname]` to see where a bundled gem is installed.
[root@h102 kiba]#
加入对列进行检查的类 VerifyFieldsPresence ,并定义处理逻辑
代码语言:javascript复制[root@h102 kiba]# vim common.rb
[root@h102 kiba]# cat common.rb
require 'csv'
class CsvSource
def initialize(file, options)
@file = file
@options = options
end
def each
CSV.foreach(@file, @options) do |row|
yield row.to_hash
end
end
end
require 'awesome_print'
def show_me
transform do |row|
ap row
row # always return the row to keep it in the pipeline
end
end
class ParseFrenchFloat
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
row[@to] = Float(row[@from].gsub(',', '.'))
row
end
end
class ParseFrenchDate
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
row[@to] = Date.strptime(row[@from], '%d/%m/%Y').to_s
row
end
end
class RenameField
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
row[@to] = row.delete(@from)
row
end
end
require 'facets/kernel/blank'
class VerifyFieldsPresence
def initialize(expected_fields)
@expected_fields = expected_fields
end
def process(row)
@expected_fields.each do |field|
if row[field].blank?
raise "Row lacks value for field #{field} - #{row.inspect}"
end
end
row
end
end
[root@h102 kiba]# vim convert-csv.etl
[root@h102 kiba]# cat convert-csv.etl
require_relative 'common'
# read from source CSV file
source CsvSource, 'commandes.csv', col_sep: ';', headers: true, header_converters: :symbol
#verify the source columns are there and provide a non-blank value
transform VerifyFieldsPresence, [:date_facture, :montant_eur, :numero_commande]
# Parse the numbers
transform ParseFrenchFloat, from: :montant_eur, to: :amount_eur
#Reformat the dates
transform ParseFrenchDate, from: :date_facture, to: :invoice_date
#Rename the remaining column
transform RenameField, from: :numero_commande, to: :invoice_number
# show details of row contents
show_me
[root@h102 kiba]# bundle exec kiba convert-csv.etl
{
:date_facture => "7/3/2015",
:montant_eur => "10,96",
:amount_eur => 10.96,
:invoice_date => "2015-03-07",
:invoice_number => "FA1986"
}
{
:date_facture => "7/3/2015",
:montant_eur => "85,11",
:amount_eur => 85.11,
:invoice_date => "2015-03-07",
:invoice_number => "FA1987"
}
{
:date_facture => "8/3/2015",
:montant_eur => "6,41",
:amount_eur => 6.41,
:invoice_date => "2015-03-08",
:invoice_number => "FA1988"
}
[root@h102 kiba]#