From 60b52a10fb4cf5fd84b568d827326de72b3d078b Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Mon, 17 Feb 2014 13:52:23 -0800 Subject: [PATCH 01/18] added avro format, depends on RAVRO Still problems with class path and attaching bit64 --- pkg/DESCRIPTION | 2 +- pkg/R/IO.R | 756 +++++++++++++++++++++++++----------------------- 2 files changed, 394 insertions(+), 364 deletions(-) diff --git a/pkg/DESCRIPTION b/pkg/DESCRIPTION index 37d5ef19..ed86a0e9 100644 --- a/pkg/DESCRIPTION +++ b/pkg/DESCRIPTION @@ -5,7 +5,7 @@ Version: 2.3.0 Date: 2013-10-1 Author: Revolution Analytics Depends: R (>= 2.6.0), Rcpp, RJSONIO (>= 0.8-2), bitops, digest, functional, stringr, plyr, reshape2 -Suggests: quickcheck +Suggests: quickcheck, ravro Collate: basic.R keyval.R IO.R local.R streaming.R mapreduce.R extras.R quickcheck-rmr.R parse-url.R Maintainer: Revolution Analytics Description: Supports the map reduce programming model on top of hadoop streaming diff --git a/pkg/R/IO.R b/pkg/R/IO.R index 57fac652..e3b4266b 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -14,411 +14,441 @@ make.json.input.format = - function(key.class = rmr2:::qw(list, vector, data.frame, matrix), - value.class = rmr2:::qw(list, vector, data.frame, matrix)) { #leave the pkg qualifier in here - key.class = match.arg(key.class) - value.class = match.arg(value.class) - cast = - function(class) - switch( - class, - list = identity, - vector = as.vector, - data.frame = function(x) do.call(data.frame, x), - matrix = function(x) do.call(rbind, x)) - process.field = - function(field, class) - cast(class)(fromJSON(field, asText = TRUE)) - function(con, keyval.length) { - lines = readLines(con, keyval.length) - if (length(lines) == 0) NULL - else { - splits = strsplit(lines, "\t") - c.keyval( - lapply(splits, - function(x) - if(length(x) == 1) - keyval(NULL, process.field(x[1], value.class)) - else - keyval(process.field(x[1], key.class), process.field(x[2], value.class))))}}} + function(key.class = rmr2:::qw(list, vector, data.frame, matrix), + value.class = rmr2:::qw(list, vector, data.frame, matrix)) { #leave the pkg qualifier in here + key.class = match.arg(key.class) + value.class = match.arg(value.class) + cast = + function(class) + switch( + class, + list = identity, + vector = as.vector, + data.frame = function(x) do.call(data.frame, x), + matrix = function(x) do.call(rbind, x)) + process.field = + function(field, class) + cast(class)(fromJSON(field, asText = TRUE)) + function(con, keyval.length) { + lines = readLines(con, keyval.length) + if (length(lines) == 0) NULL + else { + splits = strsplit(lines, "\t") + c.keyval( + lapply(splits, + function(x) + if(length(x) == 1) + keyval(NULL, process.field(x[1], value.class)) + else + keyval(process.field(x[1], key.class), process.field(x[2], value.class))))}}} json.output.format = function(kv, con) { - ser = function(k, v) paste(gsub("\n", "", toJSON(k, .escapeEscapes=TRUE, collapse = "")), - gsub("\n", "", toJSON(v, .escapeEscapes=TRUE, collapse = "")), - sep = "\t") - out = reduce.keyval(kv, ser, rmr.options('keyval.length')) - writeLines(paste(out, collapse = "\n"), sep = "", con = con)} + ser = function(k, v) paste(gsub("\n", "", toJSON(k, .escapeEscapes=TRUE, collapse = "")), + gsub("\n", "", toJSON(v, .escapeEscapes=TRUE, collapse = "")), + sep = "\t") + out = reduce.keyval(kv, ser, rmr.options('keyval.length')) + writeLines(paste(out, collapse = "\n"), sep = "", con = con)} text.input.format = function(con, keyval.length) { - lines = readLines(con, keyval.length) - if (length(lines) == 0) NULL - else keyval(NULL, lines)} + lines = readLines(con, keyval.length) + if (length(lines) == 0) NULL + else keyval(NULL, lines)} text.output.format = function(kv, con) { - ser = function(k, v) paste(k, v, collapse = "", sep = "\t") - out = reduce.keyval(kv, ser, length.keyval(kv)) - writeLines(paste(out, "\n", collapse="", sep = ""), sep = "", con = con)} + ser = function(k, v) paste(k, v, collapse = "", sep = "\t") + out = reduce.keyval(kv, ser, length.keyval(kv)) + writeLines(paste(out, "\n", collapse="", sep = ""), sep = "", con = con)} make.csv.input.format = function(...) function(con, keyval.length) { - df = - tryCatch( - read.table(file = con, nrows = keyval.length, header = FALSE, ...), - error = - function(e) { - if(e$message != "no lines available in input") - stop(e$message) - NULL}) - if(is.null(df) || dim(df)[[1]] == 0) NULL - else keyval(NULL, df)} + df = + tryCatch( + read.table(file = con, nrows = keyval.length, header = FALSE, ...), + error = + function(e) { + if(e$message != "no lines available in input") + stop(e$message) + NULL}) + if(is.null(df) || dim(df)[[1]] == 0) NULL + else keyval(NULL, df)} make.csv.output.format = function(...) function(kv, con) { - kv = recycle.keyval(kv) - k = keys(kv) - v = values(kv) - write.table(file = con, - x = if(is.null(k)) v else cbind(k, v), - ..., - row.names = FALSE, - col.names = FALSE)} + kv = recycle.keyval(kv) + k = keys(kv) + v = values(kv) + write.table(file = con, + x = if(is.null(k)) v else cbind(k, v), + ..., + row.names = FALSE, + col.names = FALSE)} typedbytes.reader = function(data, nobjs) { - if(is.null(data)) NULL - else - .Call("typedbytes_reader", data, nobjs, PACKAGE = "rmr2")} + if(is.null(data)) NULL + else + .Call("typedbytes_reader", data, nobjs, PACKAGE = "rmr2")} typedbytes.writer = function(objects, con, native) { - writeBin( - .Call("typedbytes_writer", objects, native, PACKAGE = "rmr2"), - con)} + writeBin( + .Call("typedbytes_writer", objects, native, PACKAGE = "rmr2"), + con)} make.typedbytes.input.format = function(recycle = TRUE) { - obj.buffer = list() - obj.buffer.rmr.length = 0 - raw.buffer = raw() - read.size = 100 - function(con, keyval.length) { - while(length(obj.buffer) < 2 || - obj.buffer.rmr.length < keyval.length) { - raw.buffer <<- c(raw.buffer, readBin(con, raw(), read.size)) - if(length(raw.buffer) == 0) break; - parsed = typedbytes.reader(raw.buffer, as.integer(read.size/2)) - obj.buffer <<- c(obj.buffer, parsed$objects) - approx.read.records = { - if(length(parsed$objects) == 0) 0 - else - sum( - sapply(sample(parsed$objects, 10, replace = T), rmr.length)) * - length(parsed$objects)/10.0 } - obj.buffer.rmr.length <<- obj.buffer.rmr.length + approx.read.records - read.size <<- ceiling(1.1^sign(keyval.length - obj.buffer.rmr.length) * read.size) - if(parsed$length != 0) raw.buffer <<- raw.buffer[-(1:parsed$length)]} - straddler = list() - retval = - if(length(obj.buffer) == 0) NULL - else { - if(length(obj.buffer)%%2 ==1) { - straddler = obj.buffer[length(obj.buffer)] - obj.buffer <<- obj.buffer[-length(obj.buffer)]} - kk = odd(obj.buffer) - vv = even(obj.buffer) - if(recycle) { - keyval( - c.or.rbind.rep(kk, sapply.rmr.length(vv)), - c.or.rbind(vv))} - else { - keyval(kk, vv)}} - obj.buffer <<- straddler - obj.buffer.rmr.length <<- 0 - retval}} - + obj.buffer = list() + obj.buffer.rmr.length = 0 + raw.buffer = raw() + read.size = 100 + function(con, keyval.length) { + while(length(obj.buffer) < 2 || + obj.buffer.rmr.length < keyval.length) { + raw.buffer <<- c(raw.buffer, readBin(con, raw(), read.size)) + if(length(raw.buffer) == 0) break; + parsed = typedbytes.reader(raw.buffer, as.integer(read.size/2)) + obj.buffer <<- c(obj.buffer, parsed$objects) + approx.read.records = { + if(length(parsed$objects) == 0) 0 + else + sum( + sapply(sample(parsed$objects, 10, replace = T), rmr.length)) * + length(parsed$objects)/10.0 } + obj.buffer.rmr.length <<- obj.buffer.rmr.length + approx.read.records + read.size <<- ceiling(1.1^sign(keyval.length - obj.buffer.rmr.length) * read.size) + if(parsed$length != 0) raw.buffer <<- raw.buffer[-(1:parsed$length)]} + straddler = list() + retval = + if(length(obj.buffer) == 0) NULL + else { + if(length(obj.buffer)%%2 ==1) { + straddler = obj.buffer[length(obj.buffer)] + obj.buffer <<- obj.buffer[-length(obj.buffer)]} + kk = odd(obj.buffer) + vv = even(obj.buffer) + if(recycle) { + keyval( + c.or.rbind.rep(kk, sapply.rmr.length(vv)), + c.or.rbind(vv))} + else { + keyval(kk, vv)}} + obj.buffer <<- straddler + obj.buffer.rmr.length <<- 0 + retval}} + make.native.input.format = make.typedbytes.input.format make.native.or.typedbytes.output.format = - function(keyval.length, native) - function(kv, con){ - kvs = split.keyval(kv, keyval.length) - typedbytes.writer(interleave(keys(kvs), values(kvs)), con, native)} + function(keyval.length, native) + function(kv, con){ + kvs = split.keyval(kv, keyval.length) + typedbytes.writer(interleave(keys(kvs), values(kvs)), con, native)} make.native.output.format = Curry(make.native.or.typedbytes.output.format, native = TRUE) make.typedbytes.output.format = Curry(make.native.or.typedbytes.output.format, native = FALSE) pRawToChar = - function(rl) - .Call("raw_list_to_character", rl, PACKAGE="rmr2") + function(rl) + .Call("raw_list_to_character", rl, PACKAGE="rmr2") hbase.rec.to.data.frame = - function( - source, - atomic, - dense, - key.deserialize = pRawToChar, - cell.deserialize = - function(x, column, family) pRawToChar(x)) { - filler = replicate(length(unlist(source))/2, NULL) - dest = - list( - key = filler, - family = filler, - column = filler, - cell = filler) - tmp = - .Call( - "hbase_to_df", - source, - dest, - PACKAGE="rmr2") - retval = data.frame( - key = - I( - key.deserialize( - tmp$data.frame$key[1:tmp$nrows])), - family = - pRawToChar( - tmp$data.frame$family[1:tmp$nrows]), - column = - pRawToChar( - tmp$data.frame$column[1:tmp$nrows]), - cell = - I( - cell.deserialize( - tmp$data.frame$cell[1:tmp$nrows], - tmp$data.frame$family[1:tmp$nrows], - tmp$data.frame$column[1:tmp$nrows]))) - if(atomic) - retval = - as.data.frame( - lapply( - retval, - function(x) if(is.factor(x)) x else unclass(x))) - if(dense) retval = dcast(retval, key ~ family + column) - retval} + function( + source, + atomic, + dense, + key.deserialize = pRawToChar, + cell.deserialize = + function(x, column, family) pRawToChar(x)) { + filler = replicate(length(unlist(source))/2, NULL) + dest = + list( + key = filler, + family = filler, + column = filler, + cell = filler) + tmp = + .Call( + "hbase_to_df", + source, + dest, + PACKAGE="rmr2") + retval = data.frame( + key = + I( + key.deserialize( + tmp$data.frame$key[1:tmp$nrows])), + family = + pRawToChar( + tmp$data.frame$family[1:tmp$nrows]), + column = + pRawToChar( + tmp$data.frame$column[1:tmp$nrows]), + cell = + I( + cell.deserialize( + tmp$data.frame$cell[1:tmp$nrows], + tmp$data.frame$family[1:tmp$nrows], + tmp$data.frame$column[1:tmp$nrows]))) + if(atomic) + retval = + as.data.frame( + lapply( + retval, + function(x) if(is.factor(x)) x else unclass(x))) + if(dense) retval = dcast(retval, key ~ family + column) + retval} make.hbase.input.format = - function(dense, atomic, key.deserialize, cell.deserialize) { - deserialize.opt = - function(deser) { - if(is.null(deser)) deser = "raw" - if(is.character(deser)) - deser = - switch( - deser, - native = - function(x, family = NULL, column = NULL) lapply(x, unserialize), - typedbytes = - function(x, family = NULL, column = NULL) - typedbytes.reader( - do.call(c, x), - nobjs = length(x)), - raw = function(x, family = NULL, column = NULL) pRawToChar(x)) - deser} - key.deserialize = deserialize.opt(key.deserialize) - cell.deserialize = deserialize.opt(cell.deserialize) - tif = make.typedbytes.input.format(recycle = FALSE) - if(is.null(dense)) dense = FALSE - function(con, keyval.length) { - rec = tif(con, keyval.length) - if(is.null(rec)) NULL - else { - df = hbase.rec.to.data.frame(rec, atomic, dense, key.deserialize, cell.deserialize) - keyval(NULL, df)}}} + function(dense, atomic, key.deserialize, cell.deserialize) { + deserialize.opt = + function(deser) { + if(is.null(deser)) deser = "raw" + if(is.character(deser)) + deser = + switch( + deser, + native = + function(x, family = NULL, column = NULL) lapply(x, unserialize), + typedbytes = + function(x, family = NULL, column = NULL) + typedbytes.reader( + do.call(c, x), + nobjs = length(x)), + raw = function(x, family = NULL, column = NULL) pRawToChar(x)) + deser} + key.deserialize = deserialize.opt(key.deserialize) + cell.deserialize = deserialize.opt(cell.deserialize) + tif = make.typedbytes.input.format(recycle = FALSE) + if(is.null(dense)) dense = FALSE + function(con, keyval.length) { + rec = tif(con, keyval.length) + if(is.null(rec)) NULL + else { + df = hbase.rec.to.data.frame(rec, atomic, dense, key.deserialize, cell.deserialize) + keyval(NULL, df)}}} data.frame.to.nested.map = function(x,ind) { - if(length(ind)>0 && nrow(x) > 0) { - spl = split(x, x[,ind[1]]) - lapply(x[,ind[1]], function(y) keyval(as.character(y), data.frame.to.nested.map(spl[[y]], ind[-1])))} - else x$value} + if(length(ind)>0 && nrow(x) > 0) { + spl = split(x, x[,ind[1]]) + lapply(x[,ind[1]], function(y) keyval(as.character(y), data.frame.to.nested.map(spl[[y]], ind[-1])))} + else x$value} hbdf.to.m3 = Curry(data.frame.to.nested.map, ind = c("key", "family", "column")) # I/O make.keyval.readwriter = - function(mode, format, keyval.length, con = NULL, read) { - if(is.null(con)) - con = { - if(mode == "text") { - if(read) file("stdin", "r") #not stdin() which is parsed by the interpreter - else stdout()} - else { - cat = { - if(.Platform$OS.type == "windows") - paste( - "\"", - system.file( - package="rmr2", - "bin", - .Platform$r_arch, - "catwin.exe"), - "\"", - sep="") - else - "cat"} - pipe(cat, ifelse(read, "rb", "wb"))}} - if (read) { - function() - format(con, keyval.length)} - else { - function(kv) - format(kv, con)}} + function(mode, format, keyval.length, con = NULL, read) { + if(is.null(con)) + con = { + if(mode == "text") { + if(read) file("stdin", "r") #not stdin() which is parsed by the interpreter + else stdout()} + else { + cat = { + if(.Platform$OS.type == "windows") + paste( + "\"", + system.file( + package="rmr2", + "bin", + .Platform$r_arch, + "catwin.exe"), + "\"", + sep="") + else + "cat"} + pipe(cat, ifelse(read, "rb", "wb"))}} + if (read) { + function() + format(con, keyval.length)} + else { + function(kv) + format(kv, con)}} make.keyval.reader = Curry(make.keyval.readwriter, read = TRUE) make.keyval.writer = Curry(make.keyval.readwriter, keyval.length = NULL, read = FALSE) +paste.fromJSON = + function(...) + rjson::fromJSON(paste("[", paste(..., sep = ","), "]")) + +make.avro.input.format.function = + function(schema.file) { + schema = ravro:::avro_get_schema(file=schema.file) + function(con, n) { + lines = + readLines(con = con, n = n) + if (length(lines) == 0) NULL + else { + x = splat(paste.fromJSON)(lines) + y = ravro:::parse_avro(x, schema,encoded_unions=FALSE) + keyval(NULL, y)}}} + IO.formats = c("text", "json", "csv", "native", - "sequence.typedbytes", "hbase", - "pig.hive") + "sequence.typedbytes", "hbase", + "pig.hive", "avro") make.input.format = - function( - format = make.native.input.format(), - mode = c("binary", "text"), - streaming.format = NULL, - ...) { - mode = match.arg(mode) - backend.parameters = NULL - if(is.character(format)) { - format = match.arg(format, IO.formats) - switch( - format, - text = { - format = text.input.format - mode = "text"}, - json = { - format = make.json.input.format(...) - mode = "text"}, - csv = { - format = make.csv.input.format(...) - mode = "text"}, - native = { - format = make.native.input.format() - mode = "binary"}, - sequence.typedbytes = { - format = make.typedbytes.input.format() - mode = "binary"}, - pig.hive = { - format = - make.csv.input.format( - sep = "\001", - comment.char = "", - fill = TRUE, - flush = TRUE, - quote = "") - mode = "text"}, - hbase = { - optlist = list(...) - format = - make.hbase.input.format( - default(optlist$dense, F), - default(optlist$atomic, F), - default(optlist$key.deserialize, "raw"), - default(optlist$cell.deserialize, "raw")) - mode = "binary" - streaming.format = - "com.dappervision.hbase.mapred.TypedBytesTableInputFormat" - family.columns = optlist$family.columns - backend.parameters = - list( - hadoop = - list( - D = - paste( - "hbase.mapred.tablecolumns=", - sep = "", - paste( - collapse = " ", - sapply( - names(family.columns), - function(fam) - paste( - fam, - family.columns[[fam]], - sep = ":", - collapse = " ")))), - libjars = system.file(package = "rmr2", "hadoopy_hbase.jar")))})} - if(is.null(streaming.format) && mode == "binary") - streaming.format = "org.apache.hadoop.streaming.AutoInputFormat" - list(mode = mode, - format = format, - streaming.format = streaming.format, - backend.parameters = backend.parameters)} + function( + format = make.native.input.format(), + mode = c("binary", "text"), + streaming.format = NULL, + ...) { + mode = match.arg(mode) + backend.parameters = NULL + optlist = list(...) + if(is.character(format)) { + format = match.arg(format, IO.formats) + switch( + format, + text = { + format = text.input.format + mode = "text"}, + json = { + format = make.json.input.format(...) + mode = "text"}, + csv = { + format = make.csv.input.format(...) + mode = "text"}, + native = { + format = make.native.input.format() + mode = "binary"}, + sequence.typedbytes = { + format = make.typedbytes.input.format() + mode = "binary"}, + pig.hive = { + format = + make.csv.input.format( + sep = "\001", + comment.char = "", + fill = TRUE, + flush = TRUE, + quote = "") + mode = "text"}, + hbase = { + format = + make.hbase.input.format( + default(optlist$dense, F), + default(optlist$atomic, F), + default(optlist$key.deserialize, "raw"), + default(optlist$cell.deserialize, "raw")) + mode = "binary" + streaming.format = + "com.dappervision.hbase.mapred.TypedBytesTableInputFormat" + family.columns = optlist$family.columns + backend.parameters = + list( + hadoop = + list( + D = + paste( + "hbase.mapred.tablecolumns=", + sep = "", + paste( + collapse = " ", + sapply( + names(family.columns), + function(fam) + paste( + fam, + family.columns[[fam]], + sep = ":", + collapse = " ")))), + libjars = system.file(package = "rmr2", "hadoopy_hbase.jar")))}, + avro = { + format = make.avro.input.format.function(optlist$schema.file) + mode = "text" + streaming.format = "org.apache.avro.mapred.AvroAsTextInputFormat" + backend.parameters = + list( + hadoop = + list( + libjars = + paste( + ravro:::AVRO_TOOLS, + file.path(dirname(ravro:::AVRO_TOOLS), "avro-1.7.5.jar"), + sep= ",")))})} + if(is.null(streaming.format) && mode == "binary") + streaming.format = "org.apache.hadoop.streaming.AutoInputFormat" + list( + mode = mode, + format = format, + streaming.format = streaming.format, + backend.parameters = backend.parameters)} set.separator.options = - function(sep) { - if(!is.null(sep)) - list( - hadoop = - list( - D = - paste( - "mapred.textoutputformat.separator=", - sep, - sep = ""), - D = - paste( - "stream.map.output.field.separator=", - sep, - sep = ""), - D = - paste( - "stream.reduce.output.field.separator=", - sep, - sep = "")))} + function(sep) { + if(!is.null(sep)) + list( + hadoop = + list( + D = + paste( + "mapred.textoutputformat.separator=", + sep, + sep = ""), + D = + paste( + "stream.map.output.field.separator=", + sep, + sep = ""), + D = + paste( + "stream.reduce.output.field.separator=", + sep, + sep = "")))} make.output.format = - function( - format = make.native.output.format(keyval.length = rmr.options('keyval.length')), - mode = c("binary", "text"), - streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat", - ...) { - mode = match.arg(mode) - backend.parameters = NULL - if(is.character(format)) { - format = match.arg(format, IO.formats) - switch( - format, - text = { - format = text.output.format - mode = "text" - streaming.format = NULL}, - json = { - format = json.output.format - mode = "text" - streaming.format = NULL}, - csv = { - format = make.csv.output.format(...) - mode = "text" - streaming.format = NULL - sep = list(...)$sep - backend.parameters = set.separator.options(sep)}, - pig.hive = { - format = - make.csv.output.format( - sep = "\001", - quote = FALSE) - mode = "text" - streaming.format = NULL}, - native = { - format = make.native.output.format( - keyval.length = rmr.options('keyval.length')) - mode = "binary" - streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"}, - sequence.typedbytes = { - format = make.typedbytes.output.format(keyval.length = rmr.options('keyval.length')) - mode = "binary" - streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"}, - hbase = { - stop("hbase output format not implemented yet") - format = make.typedbytes.output.format(recycle = FALSE) - mode = "binary" - streaming.format = "com.dappervision.mapreduce.TypedBytesTableOutputFormat" - backend.parameters = - list( - hadoop = - list( - D = paste( - "hbase.mapred.tablecolumns=", - list(...)$family, - ":", - list(...)$column, - sep = ""), - libjars = system.file(package = "rmr2", "java/hadoopy_hbase.jar")))})} - mode = match.arg(mode) - list(mode = mode, format = format, streaming.format = streaming.format, backend.parameters = backend.parameters)} + function( + format = make.native.output.format(keyval.length = rmr.options('keyval.length')), + mode = c("binary", "text"), + streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat", + ...) { + mode = match.arg(mode) + backend.parameters = NULL + if(is.character(format)) { + format = match.arg(format, IO.formats) + switch( + format, + text = { + format = text.output.format + mode = "text" + streaming.format = NULL}, + json = { + format = json.output.format + mode = "text" + streaming.format = NULL}, + csv = { + format = make.csv.output.format(...) + mode = "text" + streaming.format = NULL + sep = list(...)$sep + backend.parameters = set.separator.options(sep)}, + pig.hive = { + format = + make.csv.output.format( + sep = "\001", + quote = FALSE) + mode = "text" + streaming.format = NULL}, + native = { + format = make.native.output.format( + keyval.length = rmr.options('keyval.length')) + mode = "binary" + streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"}, + sequence.typedbytes = { + format = make.typedbytes.output.format(keyval.length = rmr.options('keyval.length')) + mode = "binary" + streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"}, + hbase = { + stop("hbase output format not implemented yet") + format = make.typedbytes.output.format(recycle = FALSE) + mode = "binary" + streaming.format = "com.dappervision.mapreduce.TypedBytesTableOutputFormat" + backend.parameters = + list( + hadoop = + list( + D = paste( + "hbase.mapred.tablecolumns=", + list(...)$family, + ":", + list(...)$column, + sep = ""), + libjars = system.file(package = "rmr2", "java/hadoopy_hbase.jar")))})} + mode = match.arg(mode) + list(mode = mode, format = format, streaming.format = streaming.format, backend.parameters = backend.parameters)} From 52dcd5bfae2b5dcf79f2dded04cb53576677352b Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Wed, 26 Feb 2014 14:24:42 -0800 Subject: [PATCH 02/18] add avro blurb --- pkg/man/make.io.format.Rd | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/man/make.io.format.Rd b/pkg/man/make.io.format.Rd index 6838c324..ab719ccf 100644 --- a/pkg/man/make.io.format.Rd +++ b/pkg/man/make.io.format.Rd @@ -18,10 +18,10 @@ make.output.format(format = make.native.output.format( keyval.length = rmr.opt \item{format}{Either a string describing a predefined combination of IO settings (possibilities include: \code{"text"}, \code{"json"}, \code{"csv"}, \code{"native"},\code{"sequence.typedbytes"}, \code{"hbase"}, \code{"pig.hive"}) or a function. For an input format, this function accepts a connection and a number of records and returns a key-value pair (see \code{\link{keyval}}). For an output format, this function accepts a key-value pair and a connection and writes the former to the latter.} \item{mode}{Mode can be either \code{"text"} or \code{"binary"}, which tells R what type of connection to use when opening the IO connections.} \item{streaming.format}{Class to pass to hadoop streaming as \code{inputformat} or \code{outputformat} option. This class is the first in the input chain to perform its duties on the input side and the last on the output side. Right now this option is not honored in local mode.} - \item{\dots}{Additional arguments to the format function. For the csv format they detail the specifics of the csv dialect to use and are the same as for \code{\link{read.table}} and \code{\link{write.table}} for the input and output resp, with the exception of \code{header, file, x, nrows, col.names} and \code{row.names}, the latter two allowed for input only. For \code{"json"}, only on the input side, one can specify a \code{key.class} and a \code{value.class} to help in mapping the JSON data model to R's own more flexibly. For the \code{"native"} and \code{"sequence.typedbytes"} output formats the user can specify a \code{keyval.length} that says how many values to map to a single physical key-value pair when the key is \code{NULL}. For the \code{"hbase"} format, the table name is provided as the input argument to \code{\link{mapreduce}}. Additional arguments are: \code{family.columns}, a named list where the names are family names and the elements are lists of column names within each family; \code{key.deserialize} and \code{cell.deserialize} that control the deserialization of keys and cells resp. and can take a string value or a function (explained below); \code{dense} which contols whether the data read from hbase is returned as a 4-column data frame (key, family, column and cell) or a number of columns equal to the number of columns selected, plus one for the key; finally \code{atomic} which contols whether the data frame columns are atomic or returned "as is", see \code{\link{I}}. The allowed values for the deserialization argument are \code{"raw"}, which means cells are text; \code{"typdebytes"}, which is a serialization format shared with other elements of the Hadoop system; \code{"native"} which is the native R format; or a function that takes a list of raw vectors and returns a list or vector of deserialized objects. In the case of cell.deserialize the function should take two additional argmuments for the names of family and column being deserialized }} + \item{\dots}{Additional arguments to the format function. For the \code{"csv"} format they detail the specifics of the CSV dialect to use and are the same as for \code{\link{read.table}} and \code{\link{write.table}} for the input and output resp, with the exception of \code{header, file, x, nrows, col.names} and \code{row.names}, the latter two allowed for input only. For \code{"json"}, only on the input side, one can specify a \code{key.class} and a \code{value.class} to help in mapping the JSON data model to R's own more flexibly. For the \code{"native"} and \code{"sequence.typedbytes"} output formats the user can specify a \code{keyval.length} that says how many values to map to a single physical key-value pair when the key is \code{NULL}. For the \code{"hbase"} format, the table name is provided as the input argument to \code{\link{mapreduce}}. Additional arguments are: \code{family.columns}, a named list where the names are family names and the elements are lists of column names within each family; \code{key.deserialize} and \code{cell.deserialize} that control the deserialization of keys and cells resp. and can take a string value or a function (explained below); \code{dense} which contols whether the data read from hbase is returned as a 4-column data frame (key, family, column and cell) or a number of columns equal to the number of columns selected, plus one for the key; finally \code{atomic} which contols whether the data frame columns are atomic or returned "as is", see \code{\link{I}}. The allowed values for the deserialization argument are \code{"raw"}, which means cells are text; \code{"typdebytes"}, which is a serialization format shared with other elements of the Hadoop system; \code{"native"} which is the native R format; or a function that takes a list of raw vectors and returns a list or vector of deserialized objects. In the case of \code{cell.deserialize} the function should take two additional argmuments for the names of family and column being deserialized. For the \code{avro} format (input only) there is one mandatory additional argument, \code{schema.file} that should provide the URL of a file containing an appropriate avro schema, can be the same as file to be read. The user needs to specify the protocol, for instance \code{file:} or \code{hdfs:} as part of the URL}} \details{ -The goal of these functions is to encapsulate some of the complexity of the IO settings, providing meaningful defaults and predefined combinations. If you don't want to deal with the full complexity of defining custom IO formats, there are prepackaged combinations. "text" is free text, useful mostly on the input side for NLP type applications; "json" is one or two tab separated, single line JSON objects per record; "csv" is the CSV format, configurable through additional arguments; "native.text" uses the internal R serialization in text mode, and was the default in previous releases, use only for backward compatibility; "native" uses the internal R serialization, offers the highest level of compatibility with R data types and is the default; "sequence.typedbytes" is a sequence file (in the Hadoop sense) where key and value are of type typedbytes, which is a simple serialization format used in connection with streaming for compatibility with other hadoop subsystems. Typedbytes is documented here \url{https://hadoop.apache.org/mapreduce/docs/current/api/org/apache/hadoop/typedbytes/package-summary.html}. "hbase" allows to read from (but not yet write to) an HBase table. This format should still considered experimental. Hadoop should be already configured to run streaming jobs on HBase tables \url{https://wiki.apache.org/hadoop/Hbase/MapReduce}. "pig.hive" is a variant of CSV to tranfer data to and from Hive or Pig, when using their default format \code{`ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' LINES TERMINATED BY '\n'`}. +The goal of these functions is to encapsulate some of the complexity of the IO settings, providing meaningful defaults and predefined combinations. If you don't want to deal with the full complexity of defining custom IO formats, there are prepackaged combinations. "text" is free text, useful mostly on the input side for NLP type applications; "json" is one or two tab separated, single line JSON objects per record; "csv" is the CSV format, configurable through additional arguments; "native.text" uses the internal R serialization in text mode, and was the default in previous releases, use only for backward compatibility; "native" uses the internal R serialization, offers the highest level of compatibility with R data types and is the default; "sequence.typedbytes" is a sequence file (in the Hadoop sense) where key and value are of type typedbytes, which is a simple serialization format used in connection with streaming for compatibility with other hadoop subsystems. Typedbytes is documented here \url{https://hadoop.apache.org/mapreduce/docs/current/api/org/apache/hadoop/typedbytes/package-summary.html}. "hbase" allows to read from (but not yet write to) an HBase table. This format should still considered experimental. Hadoop should be already configured to run streaming jobs on HBase tables \url{https://wiki.apache.org/hadoop/Hbase/MapReduce}. "pig.hive" is a variant of CSV to tranfer data to and from Hive or Pig, when using their default format \code{`ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' LINES TERMINATED BY '\n'`}. "avro" (input only) is the fromat defined by the Apache Avro project. If you want to implement custom formats, the input processing is the result of the composition of a Java class and an R function, and the same is true on the output side but in reverse order and you can specify both as arguments to this functions.} \value{ From 8171bc674715d66832a216cbfb8219f573398ce7 Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Wed, 26 Feb 2014 14:24:59 -0800 Subject: [PATCH 03/18] add avro mr test --- pkg/tests/mapreduce.R | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pkg/tests/mapreduce.R b/pkg/tests/mapreduce.R index 435b1951..643b7300 100644 --- a/pkg/tests/mapreduce.R +++ b/pkg/tests/mapreduce.R @@ -189,6 +189,33 @@ for (be in c("local", "hadoop")) { precondition = function(l) length(l) > 0, sample.size = 10) + #avro + + unit.test( + function(df) { + if(rmr.options("backend") == "local") TRUE + else { + tf1 = tempfile() + write.avro(df, tf1) + tf2 = tempfile() + hdfs.put(tf1, tf2) + isTRUE( + all.equal( + df, + values( + from.dfs( + mapreduce( + tf2, + format = + make.input.format( + format = "avro", + schema.file = paste("file", tf1, sep = ":"))))), + tolerance = 1e-4, + check.attributes = FALSE))}}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #equijoin stopifnot( all( From d663e20559ff6d9d8352312e5d18602b7d5e8142 Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Thu, 27 Feb 2014 16:22:07 -0800 Subject: [PATCH 04/18] looks like the auto indent feature was a on a roll --- pkg/tests/mapreduce.R | 422 +++++++++++++++++++++--------------------- 1 file changed, 211 insertions(+), 211 deletions(-) diff --git a/pkg/tests/mapreduce.R b/pkg/tests/mapreduce.R index 643b7300..d20400fa 100644 --- a/pkg/tests/mapreduce.R +++ b/pkg/tests/mapreduce.R @@ -16,215 +16,215 @@ library(quickcheck) library(rmr2) for (be in c("local", "hadoop")) { - rmr.options(backend = be) - - - ## keyval compare - kv.cmp = function(kv1, kv2) { - kv1 = rmr2:::split.keyval(kv1) - kv2 = rmr2:::split.keyval(kv2) - o1 = order(unlist(keys(kv1))) - o2 = order(unlist(keys(kv2))) - isTRUE(all.equal(keys(kv1)[o1], keys(kv2)[o2], tolerance=1e-4, check.attributes=FALSE)) && - isTRUE(all.equal(values(kv1)[o1], values(kv2)[o2], tolerance=1e-4, check.attributes=FALSE)) } - - ##from.dfs to.dfs - ##native - unit.test( - function(kv) { - kv.cmp(kv, - from.dfs(to.dfs(kv)))}, - generators = list(rmr2:::tdgg.keyval()), - sample.size = 10) - - ## csv - unit.test( - function(df) { - isTRUE( - all.equal( - df, - values( - from.dfs( - to.dfs( - df, - format = "csv"), - format = make.input.format( - format = "csv"))), - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - #json - fmt = "json" - unit.test( - function(df) { - isTRUE( - all.equal( - df, - values( - from.dfs( - to.dfs( - df, - format = fmt), - format = make.input.format("json", key.class = "list", value.class = "data.frame"))), - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - #sequence.typedbytes - seq.tb.data.loss = - function(l) - rapply( - l, - function(x) if(class(x) == "raw") x else as.list(x), - how = "replace") - - fmt = "sequence.typedbytes" - unit.test( - function(l) { - isTRUE( - all.equal( - seq.tb.data.loss(l), - values( - from.dfs( - to.dfs( - keyval(1,l), - format = fmt), - format = fmt)), - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.list()), - precondition = function(l) length(l) > 0, - sample.size = 10) - - ##mapreduce - - ##simplest mapreduce, all default - unit.test(function(kv) { - if(rmr2:::length.keyval(kv) == 0) TRUE - else { - kv1 = from.dfs(mapreduce(input = to.dfs(kv))) - kv.cmp(kv, kv1)}}, - generators = list(rmr2:::tdgg.keyval()), - sample.size = 10) - - ##put in a reduce for good measure - unit.test(function(kv) { - if(length(kv) == 0) TRUE - else { - kv1 = from.dfs(mapreduce(input = to.dfs(kv), - reduce = to.reduce(identity))) - kv.cmp(kv, kv1)}}, - generators = list(rmr2:::tdgg.keyval()), - sample.size = 10) - - ## csv - unit.test( - function(df) { - df1 = - values( - from.dfs( - mapreduce( - to.dfs( - df, - format = "csv"), - input.format = "csv", - output.format = "csv"), - format = "csv")) - isTRUE( - all.equal( - df[do.call(order,df),], - df1[do.call(order,df1),], - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - #json - # a more general test would be better for json but the subtleties of mapping R to to JSON are many - fmt = "json" - unit.test( - function(df) { - df1 = - values( - from.dfs( - mapreduce( - to.dfs( - df, - format = fmt), - input.format = make.input.format("json", key.class = "list", value.class = "data.frame"), - output.format = fmt), - format = make.input.format("json", key.class = "list", value.class = "data.frame"))) - isTRUE( - all.equal( - df[do.call(order,df),], - df1[do.call(order,df1),], - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - #sequence.typedbytes - fmt = "sequence.typedbytes" - unit.test( - function(l) { - l = seq.tb.data.loss(l) - isTRUE( - all.equal( - l, - values( - from.dfs( - mapreduce( - to.dfs( - keyval(1,l), - format = fmt), - input.format = fmt, - output.format = fmt), - format = fmt)), - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.list()), - precondition = function(l) length(l) > 0, - sample.size = 10) - - #avro - - unit.test( - function(df) { - if(rmr.options("backend") == "local") TRUE - else { - tf1 = tempfile() - write.avro(df, tf1) - tf2 = tempfile() - hdfs.put(tf1, tf2) - isTRUE( - all.equal( - df, - values( - from.dfs( - mapreduce( - tf2, - format = - make.input.format( - format = "avro", - schema.file = paste("file", tf1, sep = ":"))))), - tolerance = 1e-4, - check.attributes = FALSE))}}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - - #equijoin - stopifnot( - all( - apply( - values( - from.dfs( - equijoin( - left.input = to.dfs(keyval(1:10, (1:10)^2)), - right.input = to.dfs(keyval(1:10, (1:10)^3))))), - 1, - function(x) x[[1]]^(3/2) == x[[2]]))) + rmr.options(backend = be) + + + ## keyval compare + kv.cmp = function(kv1, kv2) { + kv1 = rmr2:::split.keyval(kv1) + kv2 = rmr2:::split.keyval(kv2) + o1 = order(unlist(keys(kv1))) + o2 = order(unlist(keys(kv2))) + isTRUE(all.equal(keys(kv1)[o1], keys(kv2)[o2], tolerance=1e-4, check.attributes=FALSE)) && + isTRUE(all.equal(values(kv1)[o1], values(kv2)[o2], tolerance=1e-4, check.attributes=FALSE)) } + + ##from.dfs to.dfs + ##native + unit.test( + function(kv) { + kv.cmp(kv, + from.dfs(to.dfs(kv)))}, + generators = list(rmr2:::tdgg.keyval()), + sample.size = 10) + + ## csv + unit.test( + function(df) { + isTRUE( + all.equal( + df, + values( + from.dfs( + to.dfs( + df, + format = "csv"), + format = make.input.format( + format = "csv"))), + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #json + fmt = "json" + unit.test( + function(df) { + isTRUE( + all.equal( + df, + values( + from.dfs( + to.dfs( + df, + format = fmt), + format = make.input.format("json", key.class = "list", value.class = "data.frame"))), + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #sequence.typedbytes + seq.tb.data.loss = + function(l) + rapply( + l, + function(x) if(class(x) == "raw") x else as.list(x), + how = "replace") + + fmt = "sequence.typedbytes" + unit.test( + function(l) { + isTRUE( + all.equal( + seq.tb.data.loss(l), + values( + from.dfs( + to.dfs( + keyval(1,l), + format = fmt), + format = fmt)), + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.list()), + precondition = function(l) length(l) > 0, + sample.size = 10) + + ##mapreduce + + ##simplest mapreduce, all default + unit.test(function(kv) { + if(rmr2:::length.keyval(kv) == 0) TRUE + else { + kv1 = from.dfs(mapreduce(input = to.dfs(kv))) + kv.cmp(kv, kv1)}}, + generators = list(rmr2:::tdgg.keyval()), + sample.size = 10) + + ##put in a reduce for good measure + unit.test(function(kv) { + if(length(kv) == 0) TRUE + else { + kv1 = from.dfs(mapreduce(input = to.dfs(kv), + reduce = to.reduce(identity))) + kv.cmp(kv, kv1)}}, + generators = list(rmr2:::tdgg.keyval()), + sample.size = 10) + + ## csv + unit.test( + function(df) { + df1 = + values( + from.dfs( + mapreduce( + to.dfs( + df, + format = "csv"), + input.format = "csv", + output.format = "csv"), + format = "csv")) + isTRUE( + all.equal( + df[do.call(order,df),], + df1[do.call(order,df1),], + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #json + # a more general test would be better for json but the subtleties of mapping R to to JSON are many + fmt = "json" + unit.test( + function(df) { + df1 = + values( + from.dfs( + mapreduce( + to.dfs( + df, + format = fmt), + input.format = make.input.format("json", key.class = "list", value.class = "data.frame"), + output.format = fmt), + format = make.input.format("json", key.class = "list", value.class = "data.frame"))) + isTRUE( + all.equal( + df[do.call(order,df),], + df1[do.call(order,df1),], + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #sequence.typedbytes + fmt = "sequence.typedbytes" + unit.test( + function(l) { + l = seq.tb.data.loss(l) + isTRUE( + all.equal( + l, + values( + from.dfs( + mapreduce( + to.dfs( + keyval(1,l), + format = fmt), + input.format = fmt, + output.format = fmt), + format = fmt)), + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.list()), + precondition = function(l) length(l) > 0, + sample.size = 10) + + #avro + + unit.test( + function(df) { + if(rmr.options("backend") == "local") TRUE + else { + tf1 = tempfile() + write.avro(df, tf1) + tf2 = tempfile() + hdfs.put(tf1, tf2) + isTRUE( + all.equal( + df, + values( + from.dfs( + mapreduce( + tf2, + format = + make.input.format( + format = "avro", + schema.file = paste("file", tf1, sep = ":"))))), + tolerance = 1e-4, + check.attributes = FALSE))}}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + + #equijoin + stopifnot( + all( + apply( + values( + from.dfs( + equijoin( + left.input = to.dfs(keyval(1:10, (1:10)^2)), + right.input = to.dfs(keyval(1:10, (1:10)^3))))), + 1, + function(x) x[[1]]^(3/2) == x[[2]]))) } \ No newline at end of file From e455306af7d85d948d2c0290e7c048414f46753c Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Tue, 4 Mar 2014 16:01:07 -0800 Subject: [PATCH 05/18] small errors --- pkg/tests/mapreduce.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/tests/mapreduce.R b/pkg/tests/mapreduce.R index d20400fa..affc7a5b 100644 --- a/pkg/tests/mapreduce.R +++ b/pkg/tests/mapreduce.R @@ -197,8 +197,8 @@ for (be in c("local", "hadoop")) { else { tf1 = tempfile() write.avro(df, tf1) - tf2 = tempfile() - hdfs.put(tf1, tf2) + tf2 = rmr2:::dfs.tempfile() + rmr2:::hdfs.put(tf1, tf2()) isTRUE( all.equal( df, @@ -206,7 +206,7 @@ for (be in c("local", "hadoop")) { from.dfs( mapreduce( tf2, - format = + input.format = make.input.format( format = "avro", schema.file = paste("file", tf1, sep = ":"))))), From 4305695bb62e86fbf7c0ca5c87f9cafb25b42e76 Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Fri, 7 Mar 2014 15:01:39 -0800 Subject: [PATCH 06/18] apparently the .avro extension is needed for input format to work fails at the java level. No input to R if extension missing. Workaround is good for now so that we know other things are working --- pkg/tests/mapreduce.R | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/tests/mapreduce.R b/pkg/tests/mapreduce.R index affc7a5b..aa5e755d 100644 --- a/pkg/tests/mapreduce.R +++ b/pkg/tests/mapreduce.R @@ -196,16 +196,18 @@ for (be in c("local", "hadoop")) { if(rmr.options("backend") == "local") TRUE else { tf1 = tempfile() - write.avro(df, tf1) + ravro:::write.avro(df, tf1) tf2 = rmr2:::dfs.tempfile() - rmr2:::hdfs.put(tf1, tf2()) + tf3 = paste(tf2(), "avro", sep = ".") + rmr2:::hdfs.put(tf1, tf3) isTRUE( all.equal( df, values( from.dfs( mapreduce( - tf2, + tf3, + map = function(k,v) rmr.str(v), input.format = make.input.format( format = "avro", From e4d14f265aaba19186db3bd626c569984d39391b Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Wed, 12 Mar 2014 13:06:32 -0700 Subject: [PATCH 07/18] option makes sure patched jars are picked up first --- pkg/R/IO.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/R/IO.R b/pkg/R/IO.R index e3b4266b..7a9f2ed3 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -361,7 +361,8 @@ make.input.format = paste( ravro:::AVRO_TOOLS, file.path(dirname(ravro:::AVRO_TOOLS), "avro-1.7.5.jar"), - sep= ",")))})} + sep= ","), + D = "mapreduce.task.classpath.user.precedence=true"))})} if(is.null(streaming.format) && mode == "binary") streaming.format = "org.apache.hadoop.streaming.AutoInputFormat" list( From 9d881c76f2bab7d7a49874b6997d0127ff1845b4 Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Wed, 12 Mar 2014 13:34:21 -0700 Subject: [PATCH 08/18] back to cdh4 bundled version --- pkg/R/IO.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/R/IO.R b/pkg/R/IO.R index 7a9f2ed3..0226ebd6 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -360,7 +360,7 @@ make.input.format = libjars = paste( ravro:::AVRO_TOOLS, - file.path(dirname(ravro:::AVRO_TOOLS), "avro-1.7.5.jar"), + file.path(dirname(ravro:::AVRO_TOOLS), "avro-1.7.4.jar"), sep= ","), D = "mapreduce.task.classpath.user.precedence=true"))})} if(is.null(streaming.format) && mode == "binary") From 1db565a1d5262b25243fe3d39a81adcd666d1c3b Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Thu, 13 Mar 2014 10:12:01 -0700 Subject: [PATCH 09/18] restructured for readability in lists. Merge with master will be problematic as the same has happened there already. Keep that version but add the avro entries when the time comes. --- pkg/man/make.io.format.Rd | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/man/make.io.format.Rd b/pkg/man/make.io.format.Rd index ab719ccf..29e7ca82 100644 --- a/pkg/man/make.io.format.Rd +++ b/pkg/man/make.io.format.Rd @@ -18,10 +18,26 @@ make.output.format(format = make.native.output.format( keyval.length = rmr.opt \item{format}{Either a string describing a predefined combination of IO settings (possibilities include: \code{"text"}, \code{"json"}, \code{"csv"}, \code{"native"},\code{"sequence.typedbytes"}, \code{"hbase"}, \code{"pig.hive"}) or a function. For an input format, this function accepts a connection and a number of records and returns a key-value pair (see \code{\link{keyval}}). For an output format, this function accepts a key-value pair and a connection and writes the former to the latter.} \item{mode}{Mode can be either \code{"text"} or \code{"binary"}, which tells R what type of connection to use when opening the IO connections.} \item{streaming.format}{Class to pass to hadoop streaming as \code{inputformat} or \code{outputformat} option. This class is the first in the input chain to perform its duties on the input side and the last on the output side. Right now this option is not honored in local mode.} - \item{\dots}{Additional arguments to the format function. For the \code{"csv"} format they detail the specifics of the CSV dialect to use and are the same as for \code{\link{read.table}} and \code{\link{write.table}} for the input and output resp, with the exception of \code{header, file, x, nrows, col.names} and \code{row.names}, the latter two allowed for input only. For \code{"json"}, only on the input side, one can specify a \code{key.class} and a \code{value.class} to help in mapping the JSON data model to R's own more flexibly. For the \code{"native"} and \code{"sequence.typedbytes"} output formats the user can specify a \code{keyval.length} that says how many values to map to a single physical key-value pair when the key is \code{NULL}. For the \code{"hbase"} format, the table name is provided as the input argument to \code{\link{mapreduce}}. Additional arguments are: \code{family.columns}, a named list where the names are family names and the elements are lists of column names within each family; \code{key.deserialize} and \code{cell.deserialize} that control the deserialization of keys and cells resp. and can take a string value or a function (explained below); \code{dense} which contols whether the data read from hbase is returned as a 4-column data frame (key, family, column and cell) or a number of columns equal to the number of columns selected, plus one for the key; finally \code{atomic} which contols whether the data frame columns are atomic or returned "as is", see \code{\link{I}}. The allowed values for the deserialization argument are \code{"raw"}, which means cells are text; \code{"typdebytes"}, which is a serialization format shared with other elements of the Hadoop system; \code{"native"} which is the native R format; or a function that takes a list of raw vectors and returns a list or vector of deserialized objects. In the case of \code{cell.deserialize} the function should take two additional argmuments for the names of family and column being deserialized. For the \code{avro} format (input only) there is one mandatory additional argument, \code{schema.file} that should provide the URL of a file containing an appropriate avro schema, can be the same as file to be read. The user needs to specify the protocol, for instance \code{file:} or \code{hdfs:} as part of the URL}} + \item{\dots}{Additional arguments to the format function, which depend on the format being defined. + \describe{ + \item{csv}{Additional arguments detail the specifics of the CSV dialect to use and are the same as for \code{\link{read.table}} and \code{\link{write.table}} for the input and output resp, with the exception of \code{header, file, x, nrows, col.names} and \code{row.names}, the latter two allowed for input only.} + \item{json}{Only for the input format, one can specify a \code{key.class} and a \code{value.class} to help in mapping the JSON data model to R's own more flexibly.} + \item{native and sequence.typedbytes}{The user can specify a \code{keyval.length} that says how many values to map to a single physical key-value pair when the key is \code{NULL}.} + \item{hbase}{The table name is provided as the input argument to \code{\link{mapreduce}}. Additional arguments are: \code{family.columns}, a named list where the names are family names and the elements are lists of column names within each family; \code{key.deserialize} and \code{cell.deserialize} that control the deserialization of keys and cells resp. and can take a string value or a function (explained below); \code{dense} which contols whether the data read from hbase is returned as a 4-column data frame (key, family, column and cell) or a number of columns equal to the number of columns selected, plus one for the key; finally \code{atomic} which contols whether the data frame columns are atomic or returned "as is", see \code{\link{I}}. The allowed values for the deserialization argument are \code{"raw"}, which means cells are text; \code{"typdebytes"}, which is a serialization format shared with other elements of the Hadoop system; \code{"native"} which is the native R format; or a function that takes a list of raw vectors and returns a list or vector of deserialized objects. In the case of \code{cell.deserialize} the function should take two additional argmuments for the names of family and column being deserialized.} + \item{avro}{(input only) It has one mandatory additional argument, \code{schema.file} that should provide the URL of a file containing an appropriate avro schema, can be the same as file to be read. The user can specify the protocol, for instance \code{file:} or \code{hdfs:} as part of the URL, with the first being the default.}}}} \details{ -The goal of these functions is to encapsulate some of the complexity of the IO settings, providing meaningful defaults and predefined combinations. If you don't want to deal with the full complexity of defining custom IO formats, there are prepackaged combinations. "text" is free text, useful mostly on the input side for NLP type applications; "json" is one or two tab separated, single line JSON objects per record; "csv" is the CSV format, configurable through additional arguments; "native.text" uses the internal R serialization in text mode, and was the default in previous releases, use only for backward compatibility; "native" uses the internal R serialization, offers the highest level of compatibility with R data types and is the default; "sequence.typedbytes" is a sequence file (in the Hadoop sense) where key and value are of type typedbytes, which is a simple serialization format used in connection with streaming for compatibility with other hadoop subsystems. Typedbytes is documented here \url{https://hadoop.apache.org/mapreduce/docs/current/api/org/apache/hadoop/typedbytes/package-summary.html}. "hbase" allows to read from (but not yet write to) an HBase table. This format should still considered experimental. Hadoop should be already configured to run streaming jobs on HBase tables \url{https://wiki.apache.org/hadoop/Hbase/MapReduce}. "pig.hive" is a variant of CSV to tranfer data to and from Hive or Pig, when using their default format \code{`ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' LINES TERMINATED BY '\n'`}. "avro" (input only) is the fromat defined by the Apache Avro project. +The goal of these functions is to encapsulate some of the complexity of the IO settings, providing meaningful defaults and predefined combinations. If you don't want to deal with the full complexity of defining custom IO formats, there are prepackaged combinations. +\describe{ +\item{text}{is free text, useful mostly on the input side for NLP type applications} +\item{json}{is one or two tab separated, single line JSON objects per record} +\item{csv}{is the CSV format, configurable through additional arguments} +\item{native.text}{uses the internal R serialization in text mode, and was the default in previous releases, use only for backward compatibility} +\item{native}{uses the internal R serialization, offers the highest level of compatibility with R data types and is the default} +\item{sequence.typedbytes}{is a sequence file (in the Hadoop sense) where key and value are of type typedbytes, which is a simple serialization format used in connection with streaming for compatibility with other hadoop subsystems. Typedbytes is documented here \url{https://hadoop.apache.org/mapreduce/docs/current/api/org/apache/hadoop/typedbytes/package-summary.html}.} +\item{hbase}{allows to read from (but not yet write to) an HBase table. This format should still considered experimental. Hadoop should be already configured to run streaming jobs on HBase tables \url{https://wiki.apache.org/hadoop/Hbase/MapReduce}.} +\item{"pig.hive"}{is a variant of CSV to tranfer data to and from Hive or Pig, when using their default format \code{`ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' LINES TERMINATED BY '\n'`}.} +\item{avro}{(input only) is the format defined by the Apache Avro project.}} If you want to implement custom formats, the input processing is the result of the composition of a Java class and an R function, and the same is true on the output side but in reverse order and you can specify both as arguments to this functions.} \value{ From 79bea0fa6c0347b7b665c61cd27a9dcaeeff72bc Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Thu, 13 Mar 2014 16:41:37 -0700 Subject: [PATCH 10/18] revert to using system jars --- pkg/R/IO.R | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/pkg/R/IO.R b/pkg/R/IO.R index 0226ebd6..071ae72b 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -352,17 +352,7 @@ make.input.format = avro = { format = make.avro.input.format.function(optlist$schema.file) mode = "text" - streaming.format = "org.apache.avro.mapred.AvroAsTextInputFormat" - backend.parameters = - list( - hadoop = - list( - libjars = - paste( - ravro:::AVRO_TOOLS, - file.path(dirname(ravro:::AVRO_TOOLS), "avro-1.7.4.jar"), - sep= ","), - D = "mapreduce.task.classpath.user.precedence=true"))})} + streaming.format = "org.apache.avro.mapred.AvroAsTextInputFormat"})} if(is.null(streaming.format) && mode == "binary") streaming.format = "org.apache.hadoop.streaming.AutoInputFormat" list( From 962ffb141fe828060094e2d00e968bec4072877c Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Fri, 14 Mar 2014 16:11:03 -0700 Subject: [PATCH 11/18] more expressive message when hitting the Nan or Inf issue fixes #4 --- pkg/R/IO.R | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/R/IO.R b/pkg/R/IO.R index 071ae72b..a50f6515 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -264,7 +264,13 @@ make.keyval.writer = Curry(make.keyval.readwriter, keyval.length = NULL, read = paste.fromJSON = function(...) - rjson::fromJSON(paste("[", paste(..., sep = ","), "]")) + tryCatch( + rjson::fromJSON(paste("[", paste(..., sep = ","), "]")), + error = + function(e){ + if(is.element(e$message, paste0("unexpected character '", c("N", "I"), "'\n"))) + e$message = ("Found unexpected character, try updating Avro to 1.7.7 or trunk") + stop(e$message)}) make.avro.input.format.function = function(schema.file) { @@ -352,7 +358,13 @@ make.input.format = avro = { format = make.avro.input.format.function(optlist$schema.file) mode = "text" - streaming.format = "org.apache.avro.mapred.AvroAsTextInputFormat"})} + streaming.format = "org.apache.avro.mapred.AvroAsTextInputFormat" + backend.parameters = + list( + hadoop = + list( + libjars = + ravro:::AVRO_TOOLS))})} if(is.null(streaming.format) && mode == "binary") streaming.format = "org.apache.hadoop.streaming.AutoInputFormat" list( From 76dcff903ab60dc272bb8a330b3ffdabafda066f Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Fri, 14 Mar 2014 16:19:01 -0700 Subject: [PATCH 12/18] rely on env variable to find avro jars --- pkg/R/IO.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/R/IO.R b/pkg/R/IO.R index a50f6515..9e77fac8 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -364,7 +364,7 @@ make.input.format = hadoop = list( libjars = - ravro:::AVRO_TOOLS))})} + gsub(":", ",", Sys.getenv("AVRO_LIBS"))))})} if(is.null(streaming.format) && mode == "binary") streaming.format = "org.apache.hadoop.streaming.AutoInputFormat" list( From 3f7411ea1850a2a09a1e327526e667d89dbc07ed Mon Sep 17 00:00:00 2001 From: Jamie Olson Date: Tue, 18 Mar 2014 12:05:58 -0400 Subject: [PATCH 13/18] Adding wrapper tools for testing avro files and transitioning avro mapreduce tests to a separate file. --- pkg/tests/avro.R | 55 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 pkg/tests/avro.R diff --git a/pkg/tests/avro.R b/pkg/tests/avro.R new file mode 100644 index 00000000..3ca85c2c --- /dev/null +++ b/pkg/tests/avro.R @@ -0,0 +1,55 @@ +# Copyright 2011 Revolution Analytics +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +library(rmr2) +library(testthat) +library(ravro) + +rmr.options(backend = "hadoop") +test_avro_rmr <- function(df,test,...) { + if(rmr.options("backend") == "local") TRUE + else { + tf1 = tempfile(fileext=".avro") + expect_true(ravro:::write.avro(df, tf1,...)) + tf2 = rmr2:::dfs.tempfile() + tf3 = paste(tf2(), "data.avro", sep = "/") + rmr2:::hdfs.mkdir(tf2()) + rmr2:::hdfs.put(tf1, tf3) + + retdf <- values( + from.dfs( + mapreduce( + tf2(), + map = function(k,v) rmr.str(v), + input.format = + make.input.format( + format = "avro", + schema.file = tf1)))) + attributes(retdf) <- attributes(retdf)[names(attributes(df))] + test(retdf) + }} + +expect_equal_avro_rmr <- function(df,...) + test_avro_rmr(df,function(x)expect_equal(x,df),...) +expect_equivalent_avro_rmr <- function(df,...) + test_avro_rmr(df,function(x)expect_equivalent(x,df),...) + +expect_equivalent_avro_rmr( + read.avro(system.file("data/yield1k.avro",package="ravro")), + unflatten=T) + +d <- data.frame(x = 1, + y = as.factor(1:10), + fac = as.factor(sample(letters[1:3], 10, replace = TRUE))) +expect_equivalent_avro_rmr(d) From d3877cb901ea8a25c2385abadb72f32b9cb49296 Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Tue, 18 Mar 2014 21:07:48 -0700 Subject: [PATCH 14/18] for some reason file: protocol not understood --- pkg/tests/mapreduce.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tests/mapreduce.R b/pkg/tests/mapreduce.R index aa5e755d..9c8499f3 100644 --- a/pkg/tests/mapreduce.R +++ b/pkg/tests/mapreduce.R @@ -211,7 +211,7 @@ for (be in c("local", "hadoop")) { input.format = make.input.format( format = "avro", - schema.file = paste("file", tf1, sep = ":"))))), + schema.file = tf1)))), #paste("file", tf1, sep = ":"))))), tolerance = 1e-4, check.attributes = FALSE))}}, generators = list(tdgg.data.frame()), From 856e3769a7483748797f82fe365fd848aa7feb50 Mon Sep 17 00:00:00 2001 From: Antonio Piccolboni Date: Thu, 20 Mar 2014 10:26:39 -0700 Subject: [PATCH 15/18] deal with slightly different format for -Inf --- pkg/R/IO.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/R/IO.R b/pkg/R/IO.R index 9e77fac8..ca1289b4 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -268,7 +268,7 @@ paste.fromJSON = rjson::fromJSON(paste("[", paste(..., sep = ","), "]")), error = function(e){ - if(is.element(e$message, paste0("unexpected character '", c("N", "I"), "'\n"))) + if(is.element(e$message, paste0("unexpected character", c(" 'N'", " 'I'", ": I"), "\n"))) e$message = ("Found unexpected character, try updating Avro to 1.7.7 or trunk") stop(e$message)}) From 18e3b2adc0cb5bd8baf6a7f963500917233dfa9f Mon Sep 17 00:00:00 2001 From: Jamie Olson Date: Mon, 24 Mar 2014 11:05:39 -0400 Subject: [PATCH 16/18] Pass additional arguments through the avro input format to `parse_avro` --- pkg/R/IO.R | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/R/IO.R b/pkg/R/IO.R index 9e77fac8..9682d10a 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -273,7 +273,7 @@ paste.fromJSON = stop(e$message)}) make.avro.input.format.function = - function(schema.file) { + function(schema.file,...) { schema = ravro:::avro_get_schema(file=schema.file) function(con, n) { lines = @@ -281,7 +281,8 @@ make.avro.input.format.function = if (length(lines) == 0) NULL else { x = splat(paste.fromJSON)(lines) - y = ravro:::parse_avro(x, schema,encoded_unions=FALSE) + y = ravro:::parse_avro(x, schema,encoded_unions=FALSE, + ...) keyval(NULL, y)}}} IO.formats = c("text", "json", "csv", "native", @@ -356,7 +357,7 @@ make.input.format = collapse = " ")))), libjars = system.file(package = "rmr2", "hadoopy_hbase.jar")))}, avro = { - format = make.avro.input.format.function(optlist$schema.file) + format = make.avro.input.format.function(optlist$schema.file,...) mode = "text" streaming.format = "org.apache.avro.mapred.AvroAsTextInputFormat" backend.parameters = From 5b71cd4994106dfcf1fac59997a73c2f983f31c0 Mon Sep 17 00:00:00 2001 From: Jamie Olson Date: Fri, 28 Mar 2014 15:25:33 -0400 Subject: [PATCH 17/18] Passing additional variables through to ravro:::parse_avro for "avro" input format --- pkg/R/IO.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/R/IO.R b/pkg/R/IO.R index e58519cd..75c840b0 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -357,7 +357,7 @@ make.input.format = collapse = " ")))), libjars = system.file(package = "rmr2", "hadoopy_hbase.jar")))}, avro = { - format = make.avro.input.format.function(optlist$schema.file,...) + format = make.avro.input.format.function(...) mode = "text" streaming.format = "org.apache.avro.mapred.AvroAsTextInputFormat" backend.parameters = From 2f43f0e3f147647d7b014fa7b1095b98e83d0b38 Mon Sep 17 00:00:00 2001 From: Jamie Olson Date: Fri, 28 Mar 2014 15:25:59 -0400 Subject: [PATCH 18/18] Extended avro testing --- pkg/tests/avro.R | 212 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 201 insertions(+), 11 deletions(-) diff --git a/pkg/tests/avro.R b/pkg/tests/avro.R index 3ca85c2c..0b7b65f7 100644 --- a/pkg/tests/avro.R +++ b/pkg/tests/avro.R @@ -17,39 +17,229 @@ library(testthat) library(ravro) rmr.options(backend = "hadoop") -test_avro_rmr <- function(df,test,...) { +test_avro_rmr <- function(df,test,write.args=list(),input.format.args=list(),map=function(k,v)rmr.str(v)) { if(rmr.options("backend") == "local") TRUE else { tf1 = tempfile(fileext=".avro") - expect_true(ravro:::write.avro(df, tf1,...)) + expect_true(do.call(ravro:::write.avro,c(list(df, tf1),write.args))) tf2 = rmr2:::dfs.tempfile() tf3 = paste(tf2(), "data.avro", sep = "/") rmr2:::hdfs.mkdir(tf2()) rmr2:::hdfs.put(tf1, tf3) - + df.input.format <- do.call(make.input.format, + c(list( + format = "avro", + schema.file = tf1), + input.format.args)) retdf <- values( from.dfs( mapreduce( tf2(), - map = function(k,v) rmr.str(v), - input.format = - make.input.format( - format = "avro", - schema.file = tf1)))) + map = map, + input.format = df.input.format))) + retdf <- retdf[row.names(df),] attributes(retdf) <- attributes(retdf)[names(attributes(df))] test(retdf) }} -expect_equal_avro_rmr <- function(df,...) +expect_equal_avro_rmr <- function(df,...){ + row.names(df) <- row.names(df) # rmr2 uses row.names function which coerces to character + # We need to make sure row.names for x is character or else this will always fail test_avro_rmr(df,function(x)expect_equal(x,df),...) +} + expect_equivalent_avro_rmr <- function(df,...) test_avro_rmr(df,function(x)expect_equivalent(x,df),...) expect_equivalent_avro_rmr( - read.avro(system.file("data/yield1k.avro",package="ravro")), - unflatten=T) + read.avro(system.file("data/yield1k.avro",package="ravro")),write.args=list(unflatten=T)) d <- data.frame(x = 1, y = as.factor(1:10), fac = as.factor(sample(letters[1:3], 10, replace = TRUE))) expect_equivalent_avro_rmr(d) + + +########################################################################################## + +context("Basic Avro Read/Write") + +### Handeling Factors +# Warnings: Factor levels converted to valid Avro names + +test_that("Handling factors", { + # Factors with non-"name" levels should still work + d <- data.frame(x = 1, + y = as.factor(1:10), + fac = as.factor(sample(letters[1:3], 10, replace = TRUE))) + expect_equal_avro_rmr(d) +}) + + +### Type Translation + +test_that("type translation", { + # All types should translate successfully + L3 <- LETTERS[1:3] + fac <- sample(L3, 10, replace = TRUE) + d <- data.frame(x = 1, y = 1:10, fac = fac, b = rep(c(TRUE, FALSE),5), c = rep(NA, 10), + stringsAsFactors=FALSE) + expect_equal_avro_rmr(d) + + d <- data.frame(x = 1, y = 1:10, fac = factor(fac,levels=L3), + b = rep(c(TRUE, FALSE),5), c = rep(NA, 10), + stringsAsFactors=FALSE) + expect_equal_avro_rmr(d) +}) + +### write can handle missing values + +test_that("write can handle missing values", { + # NA column (entirely "null" in Avro) + d <- data.frame(x = 1, + y = 1:10, + b = rep(c(TRUE, FALSE),5), + c = rep(NA, 10), + stringsAsFactors=FALSE) + expect_equal_avro_rmr(d) + + # NA row (entirely "null" in Avro) + d <- rbind(data.frame(x = 1, + y = 1:10, + b = rep(c(TRUE, FALSE),5)), + rep(NA,3)) + expect_equal_avro_rmr(d) +}) + +### NaNs throw warning + +test_that("NaNs throw warning", { + # NaN row (entirely "null" in Avro) + d <- rbind(data.frame(x = 1, + y = 1:10, + b = rep(c(TRUE, FALSE),5)), + rep(NaN,3)) + d[nrow(d),] <- NA + expect_equal_avro_rmr(d) + + # NaN row (entirely "null" in Avro) + d <- cbind(data.frame(x = 1, + y = 1:10, + b = rep(c(TRUE, FALSE),5)), + c=rep(NaN,10)) + d[,ncol(d)] <- as.numeric(NA) # coerce this type + expect_equal_avro_rmr(d) +}) + +### write.avro throws error on infinite values +## Infinite values cannot be serialied to Avro (which is good, what test verifies) + +test_that("write.avro throws error on infinite values", { + d <- rbind(data.frame(x = 1, y = 1:10, b = rep(c(TRUE, FALSE),5)), rep(NA,3), + c(Inf, 11, TRUE, NA)) + expect_that(expect_equal_avro_rmr(d), throws_error()) + + d <- rbind(data.frame(x = 1, y = 1:10, b = rep(c(TRUE, FALSE),5)), rep(NA,3), + c(-Inf, 11, TRUE, NA)) + expect_that(expect_equal_avro_rmr(d), throws_error()) +}) + +############################ Read/Write mtcars and iris ############################### + +context("Read/Write mtcars and iris") + +### mtcars round trip + +test_that("mtcars round trip", { + expect_equal_avro_rmr(mtcars) +}) + + +### factors level that are not Avro names read/write +## mttmp equivalent despite refactorization (good, warnings) +# 1: In (function (x, name = NULL, namespace = NULL, is.union = F, row.names = T, : +# Factor levels converted to valid Avro names: _3_ravro, _4_ravro, _5_ravro + +test_that("factors level that are not Avro names read/write", { + mttmp <- mtcars + mttmp$gear_factor <- as.factor(mttmp$gear) + expect_equal_avro_rmr(mttmp) +}) + + +### iris round trip +## iris_avro not equivalent +# Length mismatch: comparison on first 3 components + +test_that("iris round trip", { + # This doesn't work, because rmr2::from.dfs uses rbind to combine the values together + #expect_equal_avro_rmr(iris,write.args=list(unflatten=T),input.format.args=list(flatten=F)) + + expect_equal_avro_rmr(iris,write.args=list(unflatten=T),input.format.args=list(flatten=T)) +}) + +############################### Complex Example File ################################### + +context("Complex example file") + +complex.file.loc <- file.path(system.file("data",package="ravro"), + "complex.avro") + +### flattening works +## complex is not equivalent +# Error retrieving schema. +# Verify that the file exists and is a valid Avro: /tmp/RtmpPzKpBf/file1e4d2b5588fc.avro +## complex still not working with flatten=F and without flatten parameter + +test_that("flattening works", { + ## The complex data set is not yet supported by ravro:::write.avro + complex <- read.avro(complex.file.loc,flatten=T) + #expect_equal_avro_rmr(complex) + + complex <- read.avro(complex.file.loc, flatten=F) + #expect_equal_avro_rmr(complex) +}) + + + +################################# Read Example Avro Files ##################################### + +context("Read Example Avro Files") + +### yield reads unflattended correctly + +test_that("yield reads unflattened correctly", { + #yield <- read.avro(file.path(system.file("data",package="ravro"),"yield1k.avro"),flatten=F) + # rmr2:::from.dfs cannot combine un-flattened data values + #expect_equal_avro_rmr(yield) +}) + + +### yield reads flattened correctly +# equivalent, but not equal, because rmr2 inserts a "class" attribute with value "AsIs" for the nonStandardData + +test_that("yield reads flattened correctly", { + yield <- read.avro(file.path(system.file("data",package="ravro"),"yield1k.avro"),flatten=T) + expect_equivalent_avro_rmr(yield,write.args=list(unflatten=T)) +}) + + +### as-planted reads unflattened correctly + +test_that("as-planted reads unflattened correctly", { + #as_planted <- read.avro(file.path(system.file("data",package="ravro"),"as-planted1k.avro"), + flatten=F) + # rmr2:::from.dfs cannot combine un-flattened data values + #expect_equivalent_avro_rmr(as_planted) +}) + + +### as-planted reads flattened correctly +# equivalent, but not equal, because rmr2 inserts a "class" attribute with value "AsIs" for the nonStandardData + +test_that("as-planted reads flattened correctly", { + as_planted <- read.avro(file.path(system.file("data",package="ravro"),"as-planted1k.avro"), + flatten=T) + expect_equivalent_avro_rmr(as_planted,write.args=list(unflatten=T)) +}) +