Freezedevil
Lieutenant
- Registriert
- Mai 2011
- Beiträge
- 649
Hallo,
ich hab ein Problem mit den Datentypen von Cascading. Scheinbar sind alle Elemente eines Tupels standardmäßig Strings, aber ich möchte jetzt gerne über ein errechnetes Attribut joinen und das geht wegen den verschiedenen Datentypen schief. Kennt jemand eine Möglichkeit irgendwo anzugeben, welche Datentypen ein Tupel enthält?
Nochmal etwas konkreter
Es geht im Prinzip um das CoGroup in Zeile 46. nextatime ist berechnet und daher vom Typ Integer, während min lediglich gelesen und daher ein String ist. Ich würde jetzt gern irgendwo definieren, dass min ein Integer ist. Mir ist übrigens klar, dass ich aus nextatime auch einen String machen kann, aber das will ich eigentlich vermeiden.
Function Normalize (der Name is echt scheiße, aber mir ist spontan nichts sinnvolles eingefallen)
Das sollten eigentlich alle relevanten Schnipsel sein. Ich hoffe jemand kennt sich besser mit Cascading aus als ich.
ich hab ein Problem mit den Datentypen von Cascading. Scheinbar sind alle Elemente eines Tupels standardmäßig Strings, aber ich möchte jetzt gerne über ein errechnetes Attribut joinen und das geht wegen den verschiedenen Datentypen schief. Kennt jemand eine Möglichkeit irgendwo anzugeben, welche Datentypen ein Tupel enthält?
Nochmal etwas konkreter
Es geht im Prinzip um das CoGroup in Zeile 46. nextatime ist berechnet und daher vom Typ Integer, während min lediglich gelesen und daher ein String ist. Ich würde jetzt gern irgendwo definieren, dass min ein Integer ist. Mir ist übrigens klar, dass ich aus nextatime auch einen String machen kann, aber das will ich eigentlich vermeiden.
Code:
public Flow getFlow() {
// reads input line by line -> field name "line"
Scheme sourceScheme = new TextLine(new Fields("line"));
Tap sourceTap = new Hfs(sourceScheme, inPath);
// writes output line by line
// replaces old output
Scheme sinkScheme = new TextLine();
Tap sinkTap = new Hfs(sinkScheme, outPath, SinkMode.REPLACE);
Pipe assembly = new Pipe("travelTimeEstimation");
// parses each input line into 9 numbers
// input tuples ("line")
String regexString = "^([0-9]+),([0-9]+),([0-9]+),([0-9]+),([0-9]+),([0-9]+),([0-9]+),([0-9]+)$";
Fields fieldDeclaration = new Fields("day", "min", "xway", "dir", "seg", "lav", "cnt", "toll");
int[] groups = {1, 2, 3, 4, 5, 6, 7, 8}; // 0 is the whole line
RegexParser parser = new RegexParser(fieldDeclaration, regexString, groups);
assembly = new Each( assembly, new Fields( "line" ), parser);
// output tuples ("day","min","xway","dir","seg","lav","cnt","toll")
// filters tuples which match the given information
assembly = new Each(assembly, new RelevantTupleFilter(
params.get("dow"),
params.get("tod"),
params.get("xway"),
params.get("sinit"),
params.get("send")
));
assembly = new GroupBy(assembly, new Fields("seg", "min"));
assembly = new Every(assembly, new Fields("lav"), new Average(new Fields("avlav")));
assembly = new Every(assembly, new Fields("toll"), new Average(new Fields("avtoll")));
// output tuples after grouping and aggregation
// ("seg", "min", "avlav", "avtoll")
Pipe[] segments = splitIntoSegments(assembly);
segments[0] = new Each(segments[0], new Normalize());
// output ("seg", "min", "avlav", "avtoll", "traveltime", "nextatime")
Pipe lhs = segments[0];
Pipe rhs = segments[1];
Fields declared = new Fields("seg", "min", "avlav", "avtoll", "traveltime", "nextatime", "seg2", "min2", "avlav2", "avtoll2");
assembly = new CoGroup(lhs, new Fields("nextatime"), rhs, new Fields("min"), declared, new InnerJoin());
// constructs and returns the flow
HadoopFlowConnector flowConnector = new HadoopFlowConnector();
Flow f = flowConnector.connect("travel", sourceTap, sinkTap, assembly);
f.writeDOT("dot/travel.dot");
return f;
}
Function Normalize (der Name is echt scheiße, aber mir ist spontan nichts sinnvolles eingefallen)
Code:
private class Normalize extends BaseOperation implements Function, Serializable {
public Normalize() {
super(4, new Fields("seg", "min", "avlav", "avtoll", "traveltime", "nextatime")); // expects 4-tuples as input
}
public void operate(FlowProcess fp, FunctionCall call) {
// get the arguments TupleEntry
TupleEntry args = call.getArguments();
// create a Tuple to hold our result values
Tuple nextatime = new Tuple();
Tuple traveltime = new Tuple();
float ttime = 3600/args.getFloat("avlav");
traveltime.addFloat(ttime);
nextatime.addInteger(args.getInteger("min") + ((int)ttime)/60);
// return the result Tuple
args = args.appendNew(new TupleEntry(new Fields("traveltime"), traveltime));
args = args.appendNew(new TupleEntry(new Fields("nextatime"), nextatime));
call.getOutputCollector().add(args);
}
}
Das sollten eigentlich alle relevanten Schnipsel sein. Ich hoffe jemand kennt sich besser mit Cascading aus als ich.