-
Notifications
You must be signed in to change notification settings - Fork 0
/
StoreLookup.java
88 lines (73 loc) · 3 KB
/
StoreLookup.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.linkedin.beam.store;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
* This class does look up of remote store using the key of main input PColllection.
* The result PCollection will be KV<Key, <input value, store value>>
*/
public class StoreLookup<KS, VS, VT>
extends PTransform<PCollection<KV<KS, VS>>, PCollection<KV<KS, KV<VS, VT>>>> {
public static <KS, VS, VT> StoreLookup<KS, VS, VT> of(Map<String, String> storeConfig) {
return new StoreLookup<>(storeConfig);
}
private final Map<String, String> storeConfig;
private final Coder<VT> storeValueCoder;
private StoreLookup(Map<String, String> storeConfig) {
this.storeConfig = storeConfig;
// Need to set the store value coder for real here.
this.storeValueCoder = null;
}
@Override
public PCollection<KV<KS, KV<VS, VT>>> expand(PCollection<KV<KS, VS>> input) {
final KvCoder inputKVCoder = (KvCoder) input.getCoder();
final Coder<KS> keyCoder = inputKVCoder.getKeyCoder();
final Coder<VS> valueCoder = inputKVCoder.getValueCoder();
return input
.apply(ParDo.of(
new DoFn<KV<KS, VS>, KV<KS, KV<VS, VT>>>() {
private transient StoreClient<KS, VT> storeClient;
@Setup
public void setup() {
storeClient = createStoreClient(storeConfig);
}
@ProcessElement
public void process(ProcessContext context) {
KS key = context.element().getKey();
VT val = storeClient.get(key);
context.output(KV.of(key, KV.of(context.element().getValue(), val)));
}
}))
.setCoder(KvCoder.of(keyCoder, KvCoder.of(valueCoder, storeValueCoder)));
}
interface StoreClient<K, V> {
V get(K key);
}
static StoreClient createStoreClient(Map<String, String> config) {
return null;
}
/**
* Let's use the above code to do remote store lookup
*/
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
// dummy main input
PCollection<KV<Integer, String>> input = pipeline.apply(Create.of(KV.of(1, "a")));
// store config
Map<String, String> storeConfig = ImmutableMap.of("connect.url", "127.0.0.1");
// look up store based on the input key
PCollection<KV<Integer, KV<String, String>>> output = input.apply(StoreLookup.of(storeConfig));
pipeline.run().waitUntilFinish();
}
}