diff --git a/pkg/DESCRIPTION b/pkg/DESCRIPTION index 37d5ef19..ed86a0e9 100644 --- a/pkg/DESCRIPTION +++ b/pkg/DESCRIPTION @@ -5,7 +5,7 @@ Version: 2.3.0 Date: 2013-10-1 Author: Revolution Analytics Depends: R (>= 2.6.0), Rcpp, RJSONIO (>= 0.8-2), bitops, digest, functional, stringr, plyr, reshape2 -Suggests: quickcheck +Suggests: quickcheck, ravro Collate: basic.R keyval.R IO.R local.R streaming.R mapreduce.R extras.R quickcheck-rmr.R parse-url.R Maintainer: Revolution Analytics Description: Supports the map reduce programming model on top of hadoop streaming diff --git a/pkg/R/IO.R b/pkg/R/IO.R index 57fac652..75c840b0 100644 --- a/pkg/R/IO.R +++ b/pkg/R/IO.R @@ -14,411 +14,445 @@ make.json.input.format = - function(key.class = rmr2:::qw(list, vector, data.frame, matrix), - value.class = rmr2:::qw(list, vector, data.frame, matrix)) { #leave the pkg qualifier in here - key.class = match.arg(key.class) - value.class = match.arg(value.class) - cast = - function(class) - switch( - class, - list = identity, - vector = as.vector, - data.frame = function(x) do.call(data.frame, x), - matrix = function(x) do.call(rbind, x)) - process.field = - function(field, class) - cast(class)(fromJSON(field, asText = TRUE)) - function(con, keyval.length) { - lines = readLines(con, keyval.length) - if (length(lines) == 0) NULL - else { - splits = strsplit(lines, "\t") - c.keyval( - lapply(splits, - function(x) - if(length(x) == 1) - keyval(NULL, process.field(x[1], value.class)) - else - keyval(process.field(x[1], key.class), process.field(x[2], value.class))))}}} + function(key.class = rmr2:::qw(list, vector, data.frame, matrix), + value.class = rmr2:::qw(list, vector, data.frame, matrix)) { #leave the pkg qualifier in here + key.class = match.arg(key.class) + value.class = match.arg(value.class) + cast = + function(class) + switch( + class, + list = identity, + vector = as.vector, + data.frame = function(x) do.call(data.frame, x), + matrix = function(x) do.call(rbind, x)) + process.field = + function(field, class) + cast(class)(fromJSON(field, asText = TRUE)) + function(con, keyval.length) { + lines = readLines(con, keyval.length) + if (length(lines) == 0) NULL + else { + splits = strsplit(lines, "\t") + c.keyval( + lapply(splits, + function(x) + if(length(x) == 1) + keyval(NULL, process.field(x[1], value.class)) + else + keyval(process.field(x[1], key.class), process.field(x[2], value.class))))}}} json.output.format = function(kv, con) { - ser = function(k, v) paste(gsub("\n", "", toJSON(k, .escapeEscapes=TRUE, collapse = "")), - gsub("\n", "", toJSON(v, .escapeEscapes=TRUE, collapse = "")), - sep = "\t") - out = reduce.keyval(kv, ser, rmr.options('keyval.length')) - writeLines(paste(out, collapse = "\n"), sep = "", con = con)} + ser = function(k, v) paste(gsub("\n", "", toJSON(k, .escapeEscapes=TRUE, collapse = "")), + gsub("\n", "", toJSON(v, .escapeEscapes=TRUE, collapse = "")), + sep = "\t") + out = reduce.keyval(kv, ser, rmr.options('keyval.length')) + writeLines(paste(out, collapse = "\n"), sep = "", con = con)} text.input.format = function(con, keyval.length) { - lines = readLines(con, keyval.length) - if (length(lines) == 0) NULL - else keyval(NULL, lines)} + lines = readLines(con, keyval.length) + if (length(lines) == 0) NULL + else keyval(NULL, lines)} text.output.format = function(kv, con) { - ser = function(k, v) paste(k, v, collapse = "", sep = "\t") - out = reduce.keyval(kv, ser, length.keyval(kv)) - writeLines(paste(out, "\n", collapse="", sep = ""), sep = "", con = con)} + ser = function(k, v) paste(k, v, collapse = "", sep = "\t") + out = reduce.keyval(kv, ser, length.keyval(kv)) + writeLines(paste(out, "\n", collapse="", sep = ""), sep = "", con = con)} make.csv.input.format = function(...) function(con, keyval.length) { - df = - tryCatch( - read.table(file = con, nrows = keyval.length, header = FALSE, ...), - error = - function(e) { - if(e$message != "no lines available in input") - stop(e$message) - NULL}) - if(is.null(df) || dim(df)[[1]] == 0) NULL - else keyval(NULL, df)} + df = + tryCatch( + read.table(file = con, nrows = keyval.length, header = FALSE, ...), + error = + function(e) { + if(e$message != "no lines available in input") + stop(e$message) + NULL}) + if(is.null(df) || dim(df)[[1]] == 0) NULL + else keyval(NULL, df)} make.csv.output.format = function(...) function(kv, con) { - kv = recycle.keyval(kv) - k = keys(kv) - v = values(kv) - write.table(file = con, - x = if(is.null(k)) v else cbind(k, v), - ..., - row.names = FALSE, - col.names = FALSE)} + kv = recycle.keyval(kv) + k = keys(kv) + v = values(kv) + write.table(file = con, + x = if(is.null(k)) v else cbind(k, v), + ..., + row.names = FALSE, + col.names = FALSE)} typedbytes.reader = function(data, nobjs) { - if(is.null(data)) NULL - else - .Call("typedbytes_reader", data, nobjs, PACKAGE = "rmr2")} + if(is.null(data)) NULL + else + .Call("typedbytes_reader", data, nobjs, PACKAGE = "rmr2")} typedbytes.writer = function(objects, con, native) { - writeBin( - .Call("typedbytes_writer", objects, native, PACKAGE = "rmr2"), - con)} + writeBin( + .Call("typedbytes_writer", objects, native, PACKAGE = "rmr2"), + con)} make.typedbytes.input.format = function(recycle = TRUE) { - obj.buffer = list() - obj.buffer.rmr.length = 0 - raw.buffer = raw() - read.size = 100 - function(con, keyval.length) { - while(length(obj.buffer) < 2 || - obj.buffer.rmr.length < keyval.length) { - raw.buffer <<- c(raw.buffer, readBin(con, raw(), read.size)) - if(length(raw.buffer) == 0) break; - parsed = typedbytes.reader(raw.buffer, as.integer(read.size/2)) - obj.buffer <<- c(obj.buffer, parsed$objects) - approx.read.records = { - if(length(parsed$objects) == 0) 0 - else - sum( - sapply(sample(parsed$objects, 10, replace = T), rmr.length)) * - length(parsed$objects)/10.0 } - obj.buffer.rmr.length <<- obj.buffer.rmr.length + approx.read.records - read.size <<- ceiling(1.1^sign(keyval.length - obj.buffer.rmr.length) * read.size) - if(parsed$length != 0) raw.buffer <<- raw.buffer[-(1:parsed$length)]} - straddler = list() - retval = - if(length(obj.buffer) == 0) NULL - else { - if(length(obj.buffer)%%2 ==1) { - straddler = obj.buffer[length(obj.buffer)] - obj.buffer <<- obj.buffer[-length(obj.buffer)]} - kk = odd(obj.buffer) - vv = even(obj.buffer) - if(recycle) { - keyval( - c.or.rbind.rep(kk, sapply.rmr.length(vv)), - c.or.rbind(vv))} - else { - keyval(kk, vv)}} - obj.buffer <<- straddler - obj.buffer.rmr.length <<- 0 - retval}} - + obj.buffer = list() + obj.buffer.rmr.length = 0 + raw.buffer = raw() + read.size = 100 + function(con, keyval.length) { + while(length(obj.buffer) < 2 || + obj.buffer.rmr.length < keyval.length) { + raw.buffer <<- c(raw.buffer, readBin(con, raw(), read.size)) + if(length(raw.buffer) == 0) break; + parsed = typedbytes.reader(raw.buffer, as.integer(read.size/2)) + obj.buffer <<- c(obj.buffer, parsed$objects) + approx.read.records = { + if(length(parsed$objects) == 0) 0 + else + sum( + sapply(sample(parsed$objects, 10, replace = T), rmr.length)) * + length(parsed$objects)/10.0 } + obj.buffer.rmr.length <<- obj.buffer.rmr.length + approx.read.records + read.size <<- ceiling(1.1^sign(keyval.length - obj.buffer.rmr.length) * read.size) + if(parsed$length != 0) raw.buffer <<- raw.buffer[-(1:parsed$length)]} + straddler = list() + retval = + if(length(obj.buffer) == 0) NULL + else { + if(length(obj.buffer)%%2 ==1) { + straddler = obj.buffer[length(obj.buffer)] + obj.buffer <<- obj.buffer[-length(obj.buffer)]} + kk = odd(obj.buffer) + vv = even(obj.buffer) + if(recycle) { + keyval( + c.or.rbind.rep(kk, sapply.rmr.length(vv)), + c.or.rbind(vv))} + else { + keyval(kk, vv)}} + obj.buffer <<- straddler + obj.buffer.rmr.length <<- 0 + retval}} + make.native.input.format = make.typedbytes.input.format make.native.or.typedbytes.output.format = - function(keyval.length, native) - function(kv, con){ - kvs = split.keyval(kv, keyval.length) - typedbytes.writer(interleave(keys(kvs), values(kvs)), con, native)} + function(keyval.length, native) + function(kv, con){ + kvs = split.keyval(kv, keyval.length) + typedbytes.writer(interleave(keys(kvs), values(kvs)), con, native)} make.native.output.format = Curry(make.native.or.typedbytes.output.format, native = TRUE) make.typedbytes.output.format = Curry(make.native.or.typedbytes.output.format, native = FALSE) pRawToChar = - function(rl) - .Call("raw_list_to_character", rl, PACKAGE="rmr2") + function(rl) + .Call("raw_list_to_character", rl, PACKAGE="rmr2") hbase.rec.to.data.frame = - function( - source, - atomic, - dense, - key.deserialize = pRawToChar, - cell.deserialize = - function(x, column, family) pRawToChar(x)) { - filler = replicate(length(unlist(source))/2, NULL) - dest = - list( - key = filler, - family = filler, - column = filler, - cell = filler) - tmp = - .Call( - "hbase_to_df", - source, - dest, - PACKAGE="rmr2") - retval = data.frame( - key = - I( - key.deserialize( - tmp$data.frame$key[1:tmp$nrows])), - family = - pRawToChar( - tmp$data.frame$family[1:tmp$nrows]), - column = - pRawToChar( - tmp$data.frame$column[1:tmp$nrows]), - cell = - I( - cell.deserialize( - tmp$data.frame$cell[1:tmp$nrows], - tmp$data.frame$family[1:tmp$nrows], - tmp$data.frame$column[1:tmp$nrows]))) - if(atomic) - retval = - as.data.frame( - lapply( - retval, - function(x) if(is.factor(x)) x else unclass(x))) - if(dense) retval = dcast(retval, key ~ family + column) - retval} + function( + source, + atomic, + dense, + key.deserialize = pRawToChar, + cell.deserialize = + function(x, column, family) pRawToChar(x)) { + filler = replicate(length(unlist(source))/2, NULL) + dest = + list( + key = filler, + family = filler, + column = filler, + cell = filler) + tmp = + .Call( + "hbase_to_df", + source, + dest, + PACKAGE="rmr2") + retval = data.frame( + key = + I( + key.deserialize( + tmp$data.frame$key[1:tmp$nrows])), + family = + pRawToChar( + tmp$data.frame$family[1:tmp$nrows]), + column = + pRawToChar( + tmp$data.frame$column[1:tmp$nrows]), + cell = + I( + cell.deserialize( + tmp$data.frame$cell[1:tmp$nrows], + tmp$data.frame$family[1:tmp$nrows], + tmp$data.frame$column[1:tmp$nrows]))) + if(atomic) + retval = + as.data.frame( + lapply( + retval, + function(x) if(is.factor(x)) x else unclass(x))) + if(dense) retval = dcast(retval, key ~ family + column) + retval} make.hbase.input.format = - function(dense, atomic, key.deserialize, cell.deserialize) { - deserialize.opt = - function(deser) { - if(is.null(deser)) deser = "raw" - if(is.character(deser)) - deser = - switch( - deser, - native = - function(x, family = NULL, column = NULL) lapply(x, unserialize), - typedbytes = - function(x, family = NULL, column = NULL) - typedbytes.reader( - do.call(c, x), - nobjs = length(x)), - raw = function(x, family = NULL, column = NULL) pRawToChar(x)) - deser} - key.deserialize = deserialize.opt(key.deserialize) - cell.deserialize = deserialize.opt(cell.deserialize) - tif = make.typedbytes.input.format(recycle = FALSE) - if(is.null(dense)) dense = FALSE - function(con, keyval.length) { - rec = tif(con, keyval.length) - if(is.null(rec)) NULL - else { - df = hbase.rec.to.data.frame(rec, atomic, dense, key.deserialize, cell.deserialize) - keyval(NULL, df)}}} + function(dense, atomic, key.deserialize, cell.deserialize) { + deserialize.opt = + function(deser) { + if(is.null(deser)) deser = "raw" + if(is.character(deser)) + deser = + switch( + deser, + native = + function(x, family = NULL, column = NULL) lapply(x, unserialize), + typedbytes = + function(x, family = NULL, column = NULL) + typedbytes.reader( + do.call(c, x), + nobjs = length(x)), + raw = function(x, family = NULL, column = NULL) pRawToChar(x)) + deser} + key.deserialize = deserialize.opt(key.deserialize) + cell.deserialize = deserialize.opt(cell.deserialize) + tif = make.typedbytes.input.format(recycle = FALSE) + if(is.null(dense)) dense = FALSE + function(con, keyval.length) { + rec = tif(con, keyval.length) + if(is.null(rec)) NULL + else { + df = hbase.rec.to.data.frame(rec, atomic, dense, key.deserialize, cell.deserialize) + keyval(NULL, df)}}} data.frame.to.nested.map = function(x,ind) { - if(length(ind)>0 && nrow(x) > 0) { - spl = split(x, x[,ind[1]]) - lapply(x[,ind[1]], function(y) keyval(as.character(y), data.frame.to.nested.map(spl[[y]], ind[-1])))} - else x$value} + if(length(ind)>0 && nrow(x) > 0) { + spl = split(x, x[,ind[1]]) + lapply(x[,ind[1]], function(y) keyval(as.character(y), data.frame.to.nested.map(spl[[y]], ind[-1])))} + else x$value} hbdf.to.m3 = Curry(data.frame.to.nested.map, ind = c("key", "family", "column")) # I/O make.keyval.readwriter = - function(mode, format, keyval.length, con = NULL, read) { - if(is.null(con)) - con = { - if(mode == "text") { - if(read) file("stdin", "r") #not stdin() which is parsed by the interpreter - else stdout()} - else { - cat = { - if(.Platform$OS.type == "windows") - paste( - "\"", - system.file( - package="rmr2", - "bin", - .Platform$r_arch, - "catwin.exe"), - "\"", - sep="") - else - "cat"} - pipe(cat, ifelse(read, "rb", "wb"))}} - if (read) { - function() - format(con, keyval.length)} - else { - function(kv) - format(kv, con)}} + function(mode, format, keyval.length, con = NULL, read) { + if(is.null(con)) + con = { + if(mode == "text") { + if(read) file("stdin", "r") #not stdin() which is parsed by the interpreter + else stdout()} + else { + cat = { + if(.Platform$OS.type == "windows") + paste( + "\"", + system.file( + package="rmr2", + "bin", + .Platform$r_arch, + "catwin.exe"), + "\"", + sep="") + else + "cat"} + pipe(cat, ifelse(read, "rb", "wb"))}} + if (read) { + function() + format(con, keyval.length)} + else { + function(kv) + format(kv, con)}} make.keyval.reader = Curry(make.keyval.readwriter, read = TRUE) make.keyval.writer = Curry(make.keyval.readwriter, keyval.length = NULL, read = FALSE) +paste.fromJSON = + function(...) + tryCatch( + rjson::fromJSON(paste("[", paste(..., sep = ","), "]")), + error = + function(e){ + if(is.element(e$message, paste0("unexpected character", c(" 'N'", " 'I'", ": I"), "\n"))) + e$message = ("Found unexpected character, try updating Avro to 1.7.7 or trunk") + stop(e$message)}) + +make.avro.input.format.function = + function(schema.file,...) { + schema = ravro:::avro_get_schema(file=schema.file) + function(con, n) { + lines = + readLines(con = con, n = n) + if (length(lines) == 0) NULL + else { + x = splat(paste.fromJSON)(lines) + y = ravro:::parse_avro(x, schema,encoded_unions=FALSE, + ...) + keyval(NULL, y)}}} + IO.formats = c("text", "json", "csv", "native", - "sequence.typedbytes", "hbase", - "pig.hive") + "sequence.typedbytes", "hbase", + "pig.hive", "avro") make.input.format = - function( - format = make.native.input.format(), - mode = c("binary", "text"), - streaming.format = NULL, - ...) { - mode = match.arg(mode) - backend.parameters = NULL - if(is.character(format)) { - format = match.arg(format, IO.formats) - switch( - format, - text = { - format = text.input.format - mode = "text"}, - json = { - format = make.json.input.format(...) - mode = "text"}, - csv = { - format = make.csv.input.format(...) - mode = "text"}, - native = { - format = make.native.input.format() - mode = "binary"}, - sequence.typedbytes = { - format = make.typedbytes.input.format() - mode = "binary"}, - pig.hive = { - format = - make.csv.input.format( - sep = "\001", - comment.char = "", - fill = TRUE, - flush = TRUE, - quote = "") - mode = "text"}, - hbase = { - optlist = list(...) - format = - make.hbase.input.format( - default(optlist$dense, F), - default(optlist$atomic, F), - default(optlist$key.deserialize, "raw"), - default(optlist$cell.deserialize, "raw")) - mode = "binary" - streaming.format = - "com.dappervision.hbase.mapred.TypedBytesTableInputFormat" - family.columns = optlist$family.columns - backend.parameters = - list( - hadoop = - list( - D = - paste( - "hbase.mapred.tablecolumns=", - sep = "", - paste( - collapse = " ", - sapply( - names(family.columns), - function(fam) - paste( - fam, - family.columns[[fam]], - sep = ":", - collapse = " ")))), - libjars = system.file(package = "rmr2", "hadoopy_hbase.jar")))})} - if(is.null(streaming.format) && mode == "binary") - streaming.format = "org.apache.hadoop.streaming.AutoInputFormat" - list(mode = mode, - format = format, - streaming.format = streaming.format, - backend.parameters = backend.parameters)} + function( + format = make.native.input.format(), + mode = c("binary", "text"), + streaming.format = NULL, + ...) { + mode = match.arg(mode) + backend.parameters = NULL + optlist = list(...) + if(is.character(format)) { + format = match.arg(format, IO.formats) + switch( + format, + text = { + format = text.input.format + mode = "text"}, + json = { + format = make.json.input.format(...) + mode = "text"}, + csv = { + format = make.csv.input.format(...) + mode = "text"}, + native = { + format = make.native.input.format() + mode = "binary"}, + sequence.typedbytes = { + format = make.typedbytes.input.format() + mode = "binary"}, + pig.hive = { + format = + make.csv.input.format( + sep = "\001", + comment.char = "", + fill = TRUE, + flush = TRUE, + quote = "") + mode = "text"}, + hbase = { + format = + make.hbase.input.format( + default(optlist$dense, F), + default(optlist$atomic, F), + default(optlist$key.deserialize, "raw"), + default(optlist$cell.deserialize, "raw")) + mode = "binary" + streaming.format = + "com.dappervision.hbase.mapred.TypedBytesTableInputFormat" + family.columns = optlist$family.columns + backend.parameters = + list( + hadoop = + list( + D = + paste( + "hbase.mapred.tablecolumns=", + sep = "", + paste( + collapse = " ", + sapply( + names(family.columns), + function(fam) + paste( + fam, + family.columns[[fam]], + sep = ":", + collapse = " ")))), + libjars = system.file(package = "rmr2", "hadoopy_hbase.jar")))}, + avro = { + format = make.avro.input.format.function(...) + mode = "text" + streaming.format = "org.apache.avro.mapred.AvroAsTextInputFormat" + backend.parameters = + list( + hadoop = + list( + libjars = + gsub(":", ",", Sys.getenv("AVRO_LIBS"))))})} + if(is.null(streaming.format) && mode == "binary") + streaming.format = "org.apache.hadoop.streaming.AutoInputFormat" + list( + mode = mode, + format = format, + streaming.format = streaming.format, + backend.parameters = backend.parameters)} set.separator.options = - function(sep) { - if(!is.null(sep)) - list( - hadoop = - list( - D = - paste( - "mapred.textoutputformat.separator=", - sep, - sep = ""), - D = - paste( - "stream.map.output.field.separator=", - sep, - sep = ""), - D = - paste( - "stream.reduce.output.field.separator=", - sep, - sep = "")))} + function(sep) { + if(!is.null(sep)) + list( + hadoop = + list( + D = + paste( + "mapred.textoutputformat.separator=", + sep, + sep = ""), + D = + paste( + "stream.map.output.field.separator=", + sep, + sep = ""), + D = + paste( + "stream.reduce.output.field.separator=", + sep, + sep = "")))} make.output.format = - function( - format = make.native.output.format(keyval.length = rmr.options('keyval.length')), - mode = c("binary", "text"), - streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat", - ...) { - mode = match.arg(mode) - backend.parameters = NULL - if(is.character(format)) { - format = match.arg(format, IO.formats) - switch( - format, - text = { - format = text.output.format - mode = "text" - streaming.format = NULL}, - json = { - format = json.output.format - mode = "text" - streaming.format = NULL}, - csv = { - format = make.csv.output.format(...) - mode = "text" - streaming.format = NULL - sep = list(...)$sep - backend.parameters = set.separator.options(sep)}, - pig.hive = { - format = - make.csv.output.format( - sep = "\001", - quote = FALSE) - mode = "text" - streaming.format = NULL}, - native = { - format = make.native.output.format( - keyval.length = rmr.options('keyval.length')) - mode = "binary" - streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"}, - sequence.typedbytes = { - format = make.typedbytes.output.format(keyval.length = rmr.options('keyval.length')) - mode = "binary" - streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"}, - hbase = { - stop("hbase output format not implemented yet") - format = make.typedbytes.output.format(recycle = FALSE) - mode = "binary" - streaming.format = "com.dappervision.mapreduce.TypedBytesTableOutputFormat" - backend.parameters = - list( - hadoop = - list( - D = paste( - "hbase.mapred.tablecolumns=", - list(...)$family, - ":", - list(...)$column, - sep = ""), - libjars = system.file(package = "rmr2", "java/hadoopy_hbase.jar")))})} - mode = match.arg(mode) - list(mode = mode, format = format, streaming.format = streaming.format, backend.parameters = backend.parameters)} + function( + format = make.native.output.format(keyval.length = rmr.options('keyval.length')), + mode = c("binary", "text"), + streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat", + ...) { + mode = match.arg(mode) + backend.parameters = NULL + if(is.character(format)) { + format = match.arg(format, IO.formats) + switch( + format, + text = { + format = text.output.format + mode = "text" + streaming.format = NULL}, + json = { + format = json.output.format + mode = "text" + streaming.format = NULL}, + csv = { + format = make.csv.output.format(...) + mode = "text" + streaming.format = NULL + sep = list(...)$sep + backend.parameters = set.separator.options(sep)}, + pig.hive = { + format = + make.csv.output.format( + sep = "\001", + quote = FALSE) + mode = "text" + streaming.format = NULL}, + native = { + format = make.native.output.format( + keyval.length = rmr.options('keyval.length')) + mode = "binary" + streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"}, + sequence.typedbytes = { + format = make.typedbytes.output.format(keyval.length = rmr.options('keyval.length')) + mode = "binary" + streaming.format = "org.apache.hadoop.mapred.SequenceFileOutputFormat"}, + hbase = { + stop("hbase output format not implemented yet") + format = make.typedbytes.output.format(recycle = FALSE) + mode = "binary" + streaming.format = "com.dappervision.mapreduce.TypedBytesTableOutputFormat" + backend.parameters = + list( + hadoop = + list( + D = paste( + "hbase.mapred.tablecolumns=", + list(...)$family, + ":", + list(...)$column, + sep = ""), + libjars = system.file(package = "rmr2", "java/hadoopy_hbase.jar")))})} + mode = match.arg(mode) + list(mode = mode, format = format, streaming.format = streaming.format, backend.parameters = backend.parameters)} diff --git a/pkg/man/make.io.format.Rd b/pkg/man/make.io.format.Rd index 6838c324..29e7ca82 100644 --- a/pkg/man/make.io.format.Rd +++ b/pkg/man/make.io.format.Rd @@ -18,10 +18,26 @@ make.output.format(format = make.native.output.format( keyval.length = rmr.opt \item{format}{Either a string describing a predefined combination of IO settings (possibilities include: \code{"text"}, \code{"json"}, \code{"csv"}, \code{"native"},\code{"sequence.typedbytes"}, \code{"hbase"}, \code{"pig.hive"}) or a function. For an input format, this function accepts a connection and a number of records and returns a key-value pair (see \code{\link{keyval}}). For an output format, this function accepts a key-value pair and a connection and writes the former to the latter.} \item{mode}{Mode can be either \code{"text"} or \code{"binary"}, which tells R what type of connection to use when opening the IO connections.} \item{streaming.format}{Class to pass to hadoop streaming as \code{inputformat} or \code{outputformat} option. This class is the first in the input chain to perform its duties on the input side and the last on the output side. Right now this option is not honored in local mode.} - \item{\dots}{Additional arguments to the format function. For the csv format they detail the specifics of the csv dialect to use and are the same as for \code{\link{read.table}} and \code{\link{write.table}} for the input and output resp, with the exception of \code{header, file, x, nrows, col.names} and \code{row.names}, the latter two allowed for input only. For \code{"json"}, only on the input side, one can specify a \code{key.class} and a \code{value.class} to help in mapping the JSON data model to R's own more flexibly. For the \code{"native"} and \code{"sequence.typedbytes"} output formats the user can specify a \code{keyval.length} that says how many values to map to a single physical key-value pair when the key is \code{NULL}. For the \code{"hbase"} format, the table name is provided as the input argument to \code{\link{mapreduce}}. Additional arguments are: \code{family.columns}, a named list where the names are family names and the elements are lists of column names within each family; \code{key.deserialize} and \code{cell.deserialize} that control the deserialization of keys and cells resp. and can take a string value or a function (explained below); \code{dense} which contols whether the data read from hbase is returned as a 4-column data frame (key, family, column and cell) or a number of columns equal to the number of columns selected, plus one for the key; finally \code{atomic} which contols whether the data frame columns are atomic or returned "as is", see \code{\link{I}}. The allowed values for the deserialization argument are \code{"raw"}, which means cells are text; \code{"typdebytes"}, which is a serialization format shared with other elements of the Hadoop system; \code{"native"} which is the native R format; or a function that takes a list of raw vectors and returns a list or vector of deserialized objects. In the case of cell.deserialize the function should take two additional argmuments for the names of family and column being deserialized }} + \item{\dots}{Additional arguments to the format function, which depend on the format being defined. + \describe{ + \item{csv}{Additional arguments detail the specifics of the CSV dialect to use and are the same as for \code{\link{read.table}} and \code{\link{write.table}} for the input and output resp, with the exception of \code{header, file, x, nrows, col.names} and \code{row.names}, the latter two allowed for input only.} + \item{json}{Only for the input format, one can specify a \code{key.class} and a \code{value.class} to help in mapping the JSON data model to R's own more flexibly.} + \item{native and sequence.typedbytes}{The user can specify a \code{keyval.length} that says how many values to map to a single physical key-value pair when the key is \code{NULL}.} + \item{hbase}{The table name is provided as the input argument to \code{\link{mapreduce}}. Additional arguments are: \code{family.columns}, a named list where the names are family names and the elements are lists of column names within each family; \code{key.deserialize} and \code{cell.deserialize} that control the deserialization of keys and cells resp. and can take a string value or a function (explained below); \code{dense} which contols whether the data read from hbase is returned as a 4-column data frame (key, family, column and cell) or a number of columns equal to the number of columns selected, plus one for the key; finally \code{atomic} which contols whether the data frame columns are atomic or returned "as is", see \code{\link{I}}. The allowed values for the deserialization argument are \code{"raw"}, which means cells are text; \code{"typdebytes"}, which is a serialization format shared with other elements of the Hadoop system; \code{"native"} which is the native R format; or a function that takes a list of raw vectors and returns a list or vector of deserialized objects. In the case of \code{cell.deserialize} the function should take two additional argmuments for the names of family and column being deserialized.} + \item{avro}{(input only) It has one mandatory additional argument, \code{schema.file} that should provide the URL of a file containing an appropriate avro schema, can be the same as file to be read. The user can specify the protocol, for instance \code{file:} or \code{hdfs:} as part of the URL, with the first being the default.}}}} \details{ -The goal of these functions is to encapsulate some of the complexity of the IO settings, providing meaningful defaults and predefined combinations. If you don't want to deal with the full complexity of defining custom IO formats, there are prepackaged combinations. "text" is free text, useful mostly on the input side for NLP type applications; "json" is one or two tab separated, single line JSON objects per record; "csv" is the CSV format, configurable through additional arguments; "native.text" uses the internal R serialization in text mode, and was the default in previous releases, use only for backward compatibility; "native" uses the internal R serialization, offers the highest level of compatibility with R data types and is the default; "sequence.typedbytes" is a sequence file (in the Hadoop sense) where key and value are of type typedbytes, which is a simple serialization format used in connection with streaming for compatibility with other hadoop subsystems. Typedbytes is documented here \url{https://hadoop.apache.org/mapreduce/docs/current/api/org/apache/hadoop/typedbytes/package-summary.html}. "hbase" allows to read from (but not yet write to) an HBase table. This format should still considered experimental. Hadoop should be already configured to run streaming jobs on HBase tables \url{https://wiki.apache.org/hadoop/Hbase/MapReduce}. "pig.hive" is a variant of CSV to tranfer data to and from Hive or Pig, when using their default format \code{`ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' LINES TERMINATED BY '\n'`}. +The goal of these functions is to encapsulate some of the complexity of the IO settings, providing meaningful defaults and predefined combinations. If you don't want to deal with the full complexity of defining custom IO formats, there are prepackaged combinations. +\describe{ +\item{text}{is free text, useful mostly on the input side for NLP type applications} +\item{json}{is one or two tab separated, single line JSON objects per record} +\item{csv}{is the CSV format, configurable through additional arguments} +\item{native.text}{uses the internal R serialization in text mode, and was the default in previous releases, use only for backward compatibility} +\item{native}{uses the internal R serialization, offers the highest level of compatibility with R data types and is the default} +\item{sequence.typedbytes}{is a sequence file (in the Hadoop sense) where key and value are of type typedbytes, which is a simple serialization format used in connection with streaming for compatibility with other hadoop subsystems. Typedbytes is documented here \url{https://hadoop.apache.org/mapreduce/docs/current/api/org/apache/hadoop/typedbytes/package-summary.html}.} +\item{hbase}{allows to read from (but not yet write to) an HBase table. This format should still considered experimental. Hadoop should be already configured to run streaming jobs on HBase tables \url{https://wiki.apache.org/hadoop/Hbase/MapReduce}.} +\item{"pig.hive"}{is a variant of CSV to tranfer data to and from Hive or Pig, when using their default format \code{`ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' LINES TERMINATED BY '\n'`}.} +\item{avro}{(input only) is the format defined by the Apache Avro project.}} If you want to implement custom formats, the input processing is the result of the composition of a Java class and an R function, and the same is true on the output side but in reverse order and you can specify both as arguments to this functions.} \value{ diff --git a/pkg/tests/avro.R b/pkg/tests/avro.R new file mode 100644 index 00000000..0b7b65f7 --- /dev/null +++ b/pkg/tests/avro.R @@ -0,0 +1,245 @@ +# Copyright 2011 Revolution Analytics +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +library(rmr2) +library(testthat) +library(ravro) + +rmr.options(backend = "hadoop") +test_avro_rmr <- function(df,test,write.args=list(),input.format.args=list(),map=function(k,v)rmr.str(v)) { + if(rmr.options("backend") == "local") TRUE + else { + tf1 = tempfile(fileext=".avro") + expect_true(do.call(ravro:::write.avro,c(list(df, tf1),write.args))) + tf2 = rmr2:::dfs.tempfile() + tf3 = paste(tf2(), "data.avro", sep = "/") + rmr2:::hdfs.mkdir(tf2()) + rmr2:::hdfs.put(tf1, tf3) + df.input.format <- do.call(make.input.format, + c(list( + format = "avro", + schema.file = tf1), + input.format.args)) + retdf <- values( + from.dfs( + mapreduce( + tf2(), + map = map, + input.format = df.input.format))) + retdf <- retdf[row.names(df),] + attributes(retdf) <- attributes(retdf)[names(attributes(df))] + test(retdf) + }} + +expect_equal_avro_rmr <- function(df,...){ + row.names(df) <- row.names(df) # rmr2 uses row.names function which coerces to character + # We need to make sure row.names for x is character or else this will always fail + test_avro_rmr(df,function(x)expect_equal(x,df),...) +} + +expect_equivalent_avro_rmr <- function(df,...) + test_avro_rmr(df,function(x)expect_equivalent(x,df),...) + +expect_equivalent_avro_rmr( + read.avro(system.file("data/yield1k.avro",package="ravro")),write.args=list(unflatten=T)) + +d <- data.frame(x = 1, + y = as.factor(1:10), + fac = as.factor(sample(letters[1:3], 10, replace = TRUE))) +expect_equivalent_avro_rmr(d) + + +########################################################################################## + +context("Basic Avro Read/Write") + +### Handeling Factors +# Warnings: Factor levels converted to valid Avro names + +test_that("Handling factors", { + # Factors with non-"name" levels should still work + d <- data.frame(x = 1, + y = as.factor(1:10), + fac = as.factor(sample(letters[1:3], 10, replace = TRUE))) + expect_equal_avro_rmr(d) +}) + + +### Type Translation + +test_that("type translation", { + # All types should translate successfully + L3 <- LETTERS[1:3] + fac <- sample(L3, 10, replace = TRUE) + d <- data.frame(x = 1, y = 1:10, fac = fac, b = rep(c(TRUE, FALSE),5), c = rep(NA, 10), + stringsAsFactors=FALSE) + expect_equal_avro_rmr(d) + + d <- data.frame(x = 1, y = 1:10, fac = factor(fac,levels=L3), + b = rep(c(TRUE, FALSE),5), c = rep(NA, 10), + stringsAsFactors=FALSE) + expect_equal_avro_rmr(d) +}) + +### write can handle missing values + +test_that("write can handle missing values", { + # NA column (entirely "null" in Avro) + d <- data.frame(x = 1, + y = 1:10, + b = rep(c(TRUE, FALSE),5), + c = rep(NA, 10), + stringsAsFactors=FALSE) + expect_equal_avro_rmr(d) + + # NA row (entirely "null" in Avro) + d <- rbind(data.frame(x = 1, + y = 1:10, + b = rep(c(TRUE, FALSE),5)), + rep(NA,3)) + expect_equal_avro_rmr(d) +}) + +### NaNs throw warning + +test_that("NaNs throw warning", { + # NaN row (entirely "null" in Avro) + d <- rbind(data.frame(x = 1, + y = 1:10, + b = rep(c(TRUE, FALSE),5)), + rep(NaN,3)) + d[nrow(d),] <- NA + expect_equal_avro_rmr(d) + + # NaN row (entirely "null" in Avro) + d <- cbind(data.frame(x = 1, + y = 1:10, + b = rep(c(TRUE, FALSE),5)), + c=rep(NaN,10)) + d[,ncol(d)] <- as.numeric(NA) # coerce this type + expect_equal_avro_rmr(d) +}) + +### write.avro throws error on infinite values +## Infinite values cannot be serialied to Avro (which is good, what test verifies) + +test_that("write.avro throws error on infinite values", { + d <- rbind(data.frame(x = 1, y = 1:10, b = rep(c(TRUE, FALSE),5)), rep(NA,3), + c(Inf, 11, TRUE, NA)) + expect_that(expect_equal_avro_rmr(d), throws_error()) + + d <- rbind(data.frame(x = 1, y = 1:10, b = rep(c(TRUE, FALSE),5)), rep(NA,3), + c(-Inf, 11, TRUE, NA)) + expect_that(expect_equal_avro_rmr(d), throws_error()) +}) + +############################ Read/Write mtcars and iris ############################### + +context("Read/Write mtcars and iris") + +### mtcars round trip + +test_that("mtcars round trip", { + expect_equal_avro_rmr(mtcars) +}) + + +### factors level that are not Avro names read/write +## mttmp equivalent despite refactorization (good, warnings) +# 1: In (function (x, name = NULL, namespace = NULL, is.union = F, row.names = T, : +# Factor levels converted to valid Avro names: _3_ravro, _4_ravro, _5_ravro + +test_that("factors level that are not Avro names read/write", { + mttmp <- mtcars + mttmp$gear_factor <- as.factor(mttmp$gear) + expect_equal_avro_rmr(mttmp) +}) + + +### iris round trip +## iris_avro not equivalent +# Length mismatch: comparison on first 3 components + +test_that("iris round trip", { + # This doesn't work, because rmr2::from.dfs uses rbind to combine the values together + #expect_equal_avro_rmr(iris,write.args=list(unflatten=T),input.format.args=list(flatten=F)) + + expect_equal_avro_rmr(iris,write.args=list(unflatten=T),input.format.args=list(flatten=T)) +}) + +############################### Complex Example File ################################### + +context("Complex example file") + +complex.file.loc <- file.path(system.file("data",package="ravro"), + "complex.avro") + +### flattening works +## complex is not equivalent +# Error retrieving schema. +# Verify that the file exists and is a valid Avro: /tmp/RtmpPzKpBf/file1e4d2b5588fc.avro +## complex still not working with flatten=F and without flatten parameter + +test_that("flattening works", { + ## The complex data set is not yet supported by ravro:::write.avro + complex <- read.avro(complex.file.loc,flatten=T) + #expect_equal_avro_rmr(complex) + + complex <- read.avro(complex.file.loc, flatten=F) + #expect_equal_avro_rmr(complex) +}) + + + +################################# Read Example Avro Files ##################################### + +context("Read Example Avro Files") + +### yield reads unflattended correctly + +test_that("yield reads unflattened correctly", { + #yield <- read.avro(file.path(system.file("data",package="ravro"),"yield1k.avro"),flatten=F) + # rmr2:::from.dfs cannot combine un-flattened data values + #expect_equal_avro_rmr(yield) +}) + + +### yield reads flattened correctly +# equivalent, but not equal, because rmr2 inserts a "class" attribute with value "AsIs" for the nonStandardData + +test_that("yield reads flattened correctly", { + yield <- read.avro(file.path(system.file("data",package="ravro"),"yield1k.avro"),flatten=T) + expect_equivalent_avro_rmr(yield,write.args=list(unflatten=T)) +}) + + +### as-planted reads unflattened correctly + +test_that("as-planted reads unflattened correctly", { + #as_planted <- read.avro(file.path(system.file("data",package="ravro"),"as-planted1k.avro"), + flatten=F) + # rmr2:::from.dfs cannot combine un-flattened data values + #expect_equivalent_avro_rmr(as_planted) +}) + + +### as-planted reads flattened correctly +# equivalent, but not equal, because rmr2 inserts a "class" attribute with value "AsIs" for the nonStandardData + +test_that("as-planted reads flattened correctly", { + as_planted <- read.avro(file.path(system.file("data",package="ravro"),"as-planted1k.avro"), + flatten=T) + expect_equivalent_avro_rmr(as_planted,write.args=list(unflatten=T)) +}) + diff --git a/pkg/tests/mapreduce.R b/pkg/tests/mapreduce.R index 435b1951..9c8499f3 100644 --- a/pkg/tests/mapreduce.R +++ b/pkg/tests/mapreduce.R @@ -16,188 +16,217 @@ library(quickcheck) library(rmr2) for (be in c("local", "hadoop")) { - rmr.options(backend = be) - - - ## keyval compare - kv.cmp = function(kv1, kv2) { - kv1 = rmr2:::split.keyval(kv1) - kv2 = rmr2:::split.keyval(kv2) - o1 = order(unlist(keys(kv1))) - o2 = order(unlist(keys(kv2))) - isTRUE(all.equal(keys(kv1)[o1], keys(kv2)[o2], tolerance=1e-4, check.attributes=FALSE)) && - isTRUE(all.equal(values(kv1)[o1], values(kv2)[o2], tolerance=1e-4, check.attributes=FALSE)) } - - ##from.dfs to.dfs - ##native - unit.test( - function(kv) { - kv.cmp(kv, - from.dfs(to.dfs(kv)))}, - generators = list(rmr2:::tdgg.keyval()), - sample.size = 10) - - ## csv - unit.test( - function(df) { - isTRUE( - all.equal( - df, - values( - from.dfs( - to.dfs( - df, - format = "csv"), - format = make.input.format( - format = "csv"))), - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - #json - fmt = "json" - unit.test( - function(df) { - isTRUE( - all.equal( - df, - values( - from.dfs( - to.dfs( - df, - format = fmt), - format = make.input.format("json", key.class = "list", value.class = "data.frame"))), - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - #sequence.typedbytes - seq.tb.data.loss = - function(l) - rapply( - l, - function(x) if(class(x) == "raw") x else as.list(x), - how = "replace") - - fmt = "sequence.typedbytes" - unit.test( - function(l) { - isTRUE( - all.equal( - seq.tb.data.loss(l), - values( - from.dfs( - to.dfs( - keyval(1,l), - format = fmt), - format = fmt)), - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.list()), - precondition = function(l) length(l) > 0, - sample.size = 10) - - ##mapreduce - - ##simplest mapreduce, all default - unit.test(function(kv) { - if(rmr2:::length.keyval(kv) == 0) TRUE - else { - kv1 = from.dfs(mapreduce(input = to.dfs(kv))) - kv.cmp(kv, kv1)}}, - generators = list(rmr2:::tdgg.keyval()), - sample.size = 10) - - ##put in a reduce for good measure - unit.test(function(kv) { - if(length(kv) == 0) TRUE - else { - kv1 = from.dfs(mapreduce(input = to.dfs(kv), - reduce = to.reduce(identity))) - kv.cmp(kv, kv1)}}, - generators = list(rmr2:::tdgg.keyval()), - sample.size = 10) - - ## csv - unit.test( - function(df) { - df1 = - values( - from.dfs( - mapreduce( - to.dfs( - df, - format = "csv"), - input.format = "csv", - output.format = "csv"), - format = "csv")) - isTRUE( - all.equal( - df[do.call(order,df),], - df1[do.call(order,df1),], - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - #json - # a more general test would be better for json but the subtleties of mapping R to to JSON are many - fmt = "json" - unit.test( - function(df) { - df1 = - values( - from.dfs( - mapreduce( - to.dfs( - df, - format = fmt), - input.format = make.input.format("json", key.class = "list", value.class = "data.frame"), - output.format = fmt), - format = make.input.format("json", key.class = "list", value.class = "data.frame"))) - isTRUE( - all.equal( - df[do.call(order,df),], - df1[do.call(order,df1),], - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.data.frame()), - sample.size = 10) - - #sequence.typedbytes - fmt = "sequence.typedbytes" - unit.test( - function(l) { - l = seq.tb.data.loss(l) - isTRUE( - all.equal( - l, - values( - from.dfs( - mapreduce( - to.dfs( - keyval(1,l), - format = fmt), - input.format = fmt, - output.format = fmt), - format = fmt)), - tolerance = 1e-4, - check.attributes = FALSE))}, - generators = list(tdgg.list()), - precondition = function(l) length(l) > 0, - sample.size = 10) - - #equijoin - stopifnot( - all( - apply( - values( - from.dfs( - equijoin( - left.input = to.dfs(keyval(1:10, (1:10)^2)), - right.input = to.dfs(keyval(1:10, (1:10)^3))))), - 1, - function(x) x[[1]]^(3/2) == x[[2]]))) + rmr.options(backend = be) + + + ## keyval compare + kv.cmp = function(kv1, kv2) { + kv1 = rmr2:::split.keyval(kv1) + kv2 = rmr2:::split.keyval(kv2) + o1 = order(unlist(keys(kv1))) + o2 = order(unlist(keys(kv2))) + isTRUE(all.equal(keys(kv1)[o1], keys(kv2)[o2], tolerance=1e-4, check.attributes=FALSE)) && + isTRUE(all.equal(values(kv1)[o1], values(kv2)[o2], tolerance=1e-4, check.attributes=FALSE)) } + + ##from.dfs to.dfs + ##native + unit.test( + function(kv) { + kv.cmp(kv, + from.dfs(to.dfs(kv)))}, + generators = list(rmr2:::tdgg.keyval()), + sample.size = 10) + + ## csv + unit.test( + function(df) { + isTRUE( + all.equal( + df, + values( + from.dfs( + to.dfs( + df, + format = "csv"), + format = make.input.format( + format = "csv"))), + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #json + fmt = "json" + unit.test( + function(df) { + isTRUE( + all.equal( + df, + values( + from.dfs( + to.dfs( + df, + format = fmt), + format = make.input.format("json", key.class = "list", value.class = "data.frame"))), + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #sequence.typedbytes + seq.tb.data.loss = + function(l) + rapply( + l, + function(x) if(class(x) == "raw") x else as.list(x), + how = "replace") + + fmt = "sequence.typedbytes" + unit.test( + function(l) { + isTRUE( + all.equal( + seq.tb.data.loss(l), + values( + from.dfs( + to.dfs( + keyval(1,l), + format = fmt), + format = fmt)), + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.list()), + precondition = function(l) length(l) > 0, + sample.size = 10) + + ##mapreduce + + ##simplest mapreduce, all default + unit.test(function(kv) { + if(rmr2:::length.keyval(kv) == 0) TRUE + else { + kv1 = from.dfs(mapreduce(input = to.dfs(kv))) + kv.cmp(kv, kv1)}}, + generators = list(rmr2:::tdgg.keyval()), + sample.size = 10) + + ##put in a reduce for good measure + unit.test(function(kv) { + if(length(kv) == 0) TRUE + else { + kv1 = from.dfs(mapreduce(input = to.dfs(kv), + reduce = to.reduce(identity))) + kv.cmp(kv, kv1)}}, + generators = list(rmr2:::tdgg.keyval()), + sample.size = 10) + + ## csv + unit.test( + function(df) { + df1 = + values( + from.dfs( + mapreduce( + to.dfs( + df, + format = "csv"), + input.format = "csv", + output.format = "csv"), + format = "csv")) + isTRUE( + all.equal( + df[do.call(order,df),], + df1[do.call(order,df1),], + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #json + # a more general test would be better for json but the subtleties of mapping R to to JSON are many + fmt = "json" + unit.test( + function(df) { + df1 = + values( + from.dfs( + mapreduce( + to.dfs( + df, + format = fmt), + input.format = make.input.format("json", key.class = "list", value.class = "data.frame"), + output.format = fmt), + format = make.input.format("json", key.class = "list", value.class = "data.frame"))) + isTRUE( + all.equal( + df[do.call(order,df),], + df1[do.call(order,df1),], + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + #sequence.typedbytes + fmt = "sequence.typedbytes" + unit.test( + function(l) { + l = seq.tb.data.loss(l) + isTRUE( + all.equal( + l, + values( + from.dfs( + mapreduce( + to.dfs( + keyval(1,l), + format = fmt), + input.format = fmt, + output.format = fmt), + format = fmt)), + tolerance = 1e-4, + check.attributes = FALSE))}, + generators = list(tdgg.list()), + precondition = function(l) length(l) > 0, + sample.size = 10) + + #avro + + unit.test( + function(df) { + if(rmr.options("backend") == "local") TRUE + else { + tf1 = tempfile() + ravro:::write.avro(df, tf1) + tf2 = rmr2:::dfs.tempfile() + tf3 = paste(tf2(), "avro", sep = ".") + rmr2:::hdfs.put(tf1, tf3) + isTRUE( + all.equal( + df, + values( + from.dfs( + mapreduce( + tf3, + map = function(k,v) rmr.str(v), + input.format = + make.input.format( + format = "avro", + schema.file = tf1)))), #paste("file", tf1, sep = ":"))))), + tolerance = 1e-4, + check.attributes = FALSE))}}, + generators = list(tdgg.data.frame()), + sample.size = 10) + + + #equijoin + stopifnot( + all( + apply( + values( + from.dfs( + equijoin( + left.input = to.dfs(keyval(1:10, (1:10)^2)), + right.input = to.dfs(keyval(1:10, (1:10)^3))))), + 1, + function(x) x[[1]]^(3/2) == x[[2]]))) } \ No newline at end of file