FLINK left join three DataStreams using cogroup
I'm trying to merge three stream into a single stream. Tried union but was unable to proceed as the schema's are different and if I merge the schema it becomes too large.
So, I'm using **cogroup ** to do a left join and return a tuple of three streams.
`
DataStream<Tuple3<Schema1, Schema2, Schema3>> mergeJoin = stream1.coGroup(stream2)
.where(genericRecord -> {return SchemaUtils.getKey1(genericRecord);})
.equalTo(genericRecord -> {return SchemaUtils.getKey2(genericRecord);})
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.apply(new LeftOuterJoin1())
.join(stream3)
.where(tuple_Schema1_Schema2->{return SchemaUtils.getKey1(tuple_Schema1_Schema2);})
.equalTo( genericRecord->{ return SchemaUtils.getKey3(genericRecord);})
.window(TumblingProcessingTimeWindows.of(Time.seconds(6)))
.apply(new LeftOuterJoin2());
`
public static class LeftOuterJoin2 implements CoGroupFunction<Tuple2<schema1, schema2>, GenericRecord, Tuple3<chema1, schema2, schema3>> {
@Override
public void 开发者_Python百科coGroup(Iterable<Tuple2<SCHEMA1, SCHEMA2>> iterable, Iterable<GenericRecord> iterable1, Collector<Tuple3<SCHEMA1, SCHEMA2, SCHEMA3>> collector) throws Exception {
final SCHEMA3 NULL_ELEMENT = null;
ObjectMapper mapper = new ObjectMapper();
for (Tuple2<SCHEMA1, SCHEMA2> leftElem : iterable) {
boolean hadElements = false;
for (GenericRecord rightElem : iterable1) {
SCHEMA1 schema1_data = leftElem.f0;
SCHEMA2 schema2_data = leftElem.f1;
SCHEMA3 schema3_data = mapper.readValue(rightElem.toString(), SCHEMA3.class);
collector.collect(new Tuple3<>(schema1_data, schema2_data,schema3_data));
hadElements = true;
}
if (!hadElements) {
SCHEMA1 schema1_data = leftElem.f0;
SCHEMA2 schema2_data = leftElem.f1;
collector.collect(new Tuple3<>(schema1_data, schema2_data,NULL_ELEMENT));
}
}
}
}
While merging the first two streams of tuple with the third on **apply(new LeftOuterJoin2())** \ i'm getting
>
> Cannot resolve method 'apply(LeftOuterJoin2)' error.
Expecting1: A tuple of three streams.
Expecting2: Number of records should be equal to SCHEMA1 count.
精彩评论