Skip to content

Commit

Permalink
Second attempt and fixing the line values.
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron Ellis committed Aug 17, 2023
1 parent 56f0bc1 commit 7cfc6a7
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 44 deletions.
6 changes: 6 additions & 0 deletions sdks/swift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@ let package = Package(
)
```

## Writing a Pipeline

```
import ApacheBeam
29 changes: 15 additions & 14 deletions sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
public extension PCollection {

/// Each time the input fires output all of the values in this list.
func create<Value:Codable>(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection<Value> {
return pardo(name,values) { values,input,output in
func create<Value:Codable>(_ values: [Value],_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> {

return pardo(name,_file:_file,_line:_line,values) { values,input,output in
for try await (_,ts,w) in input {
for v in values {
output.emit(v,timestamp:ts,window:w)
Expand All @@ -35,8 +36,8 @@ public extension PCollection {
public extension PCollection {

@discardableResult
func log(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<Of> where Of == String {
pardo(name,prefix) { prefix,input,output in
func log(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Of> where Of == String {
pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in
for await element in input {
print("\(prefix): \(element)")
output.emit(element)
Expand All @@ -45,8 +46,8 @@ public extension PCollection {
}

@discardableResult
func log<K,V>(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<KV<K,V>> where Of == KV<K,V> {
pardo(name,prefix) { prefix,input,output in
func log<K,V>(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<KV<K,V>> where Of == KV<K,V> {
pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in
for await element in input {
let kv = element.0
for v in kv.values {
Expand All @@ -62,16 +63,16 @@ public extension PCollection {
public extension PCollection {

/// Modify a value without changing its window or timestamp
func map<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> {
return pardo(name) { input,output in
func map<Out>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> {
return pardo(name,_file:_file,_line:_line) { input,output in
for try await (v,ts,w) in input {
output.emit(fn(v),timestamp:ts,window:w)
}
}
}

func map<K,V>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
return pardo(name) { input,output in
func map<K,V>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
return pardo(name,_file:_file,_line:_line) { input,output in
for try await (i,ts,w) in input {
let (key,value) = fn(i)
output.emit(KV(key,value),timestamp:ts,window:w)
Expand All @@ -80,8 +81,8 @@ public extension PCollection {
}

/// Produce multiple outputs as a single value without modifying window or timestamp
func flatMap<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> {
return pardo(name) { input,output in
func flatMap<Out>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> {
return pardo(name,_file:_file,_line:_line) { input,output in
for try await (v,ts,w) in input {
for i in fn(v) {
output.emit(i,timestamp:ts,window:w)
Expand All @@ -94,7 +95,7 @@ public extension PCollection {

public extension PCollection<Never> {
/// Convenience function to add an impulse when we are at the root of the pipeline
func create<Value:Codable>(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection<Value> {
return impulse().create(values,name)
func create<Value:Codable>(_ values: [Value],_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> {
return impulse().create(values,name,_file:_file,_line:_line)
}
}
48 changes: 24 additions & 24 deletions sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,68 +33,68 @@ public extension PCollection {
// TODO: Replace with parameter pack version once https://github.com/apple/swift/issues/67192 is resolved

// No Output
func pardo<F:SerializableFn>(_ name: String = "\(#file):\(#line)",_ fn: F) {
self.apply(.pardo(name, fn, []))
func pardo<F:SerializableFn>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F) {
self.apply(.pardo(name ?? "\(_file):\(_line)", fn, []))
}
func pardo(_ name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) {
self.apply(.pardo(name, ClosureFn(fn),[]))
func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) {
self.apply(.pardo(name ?? "\(_file):\(_line)", ClosureFn(fn),[]))
}
func pardo<Param:Codable>(_ name: String = "\(#file):\(#line)",_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
self.apply(.pardo(name, ParameterizedClosureFn(param,fn), []))
func pardo<Param:Codable>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
self.apply(.pardo(name ?? "\(_file):\(_line)", ParameterizedClosureFn(param,fn), []))
}


// Single Output
func pardo<F:SerializableFn,O0>(_ name: String = "\(#file):\(#line)",_ fn: F,
func pardo<F:SerializableFn,O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F,
_ o0:PCollection<O0>) {
self.apply(.pardo(name, fn, [AnyPCollection(o0)]))
self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0)]))
}
func pardo<O0>(_ name: String = "\(#file):\(#line)",
func pardo<O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,
_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) {
let output = PCollection<O0>()
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)]))
self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)]))
return output
}
func pardo<Param:Codable,O0>(_ name: String = "\(#file):\(#line)",_ param: Param,
func pardo<Param:Codable,O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param,
_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) {
let output = PCollection<O0>()
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
return output
}

// Two Outputs
func pardo<F:SerializableFn,O0,O1>(_ name: String = "\(#file):\(#line)",_ fn: F,
func pardo<F:SerializableFn,O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F,
_ o0:PCollection<O0>,_ o1:PCollection<O1>) {
self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)]))
self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1)]))
}
func pardo<O0,O1>(_ name: String = "\(#file):\(#line)",
func pardo<O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,
_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
let output = (PCollection<O0>(),PCollection<O1>())
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
return output
}
func pardo<Param:Codable,O0,O1>(_ name: String = "\(#file):\(#line)",_ param: Param,
func pardo<Param:Codable,O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param,
_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
let output = (PCollection<O0>(),PCollection<O1>())
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
return output
}

// Three Outputs
func pardo<F:SerializableFn,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ fn: F,
func pardo<F:SerializableFn,O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F,
_ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) {
self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
}
func pardo<O0,O1,O2>(_ name: String = "\(#file):\(#line)",
func pardo<O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,
_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
return output
}
func pardo<Param:Codable,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ param: Param,
func pardo<Param:Codable,O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param,
_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
return output
}

Expand Down
8 changes: 4 additions & 4 deletions sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

/// Basic reducers
public extension PCollection {
func reduce<Result:Codable,K,V>(name:String = "\(#file):\(#line)",into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> {
return pardo(name,into) { initialValue,input,output in
func reduce<Result:Codable,K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> {
return pardo(name,_file:_file,_line:_line,into) { initialValue,input,output in
for await (kv,ts,w) in input {
var result = initialValue
for v in kv.values {
Expand All @@ -34,7 +34,7 @@ public extension PCollection {

/// Convenience functions
public extension PCollection {
func sum<K,V:Numeric&Codable>() -> PCollection<KV<K,V>> where Of == KV<K,V> {
return reduce(into: 0,{ a,b in b = b + a })
func sum<K,V:Numeric&Codable>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<KV<K,V>> where Of == KV<K,V> {
return reduce(name:name,_file:_file,_line:_line,into: 0,{ a,b in b = b + a })
}
}
4 changes: 2 additions & 2 deletions sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
/// Basic grouping functionality
///
public extension PCollection {
func groupBy<K,V>(name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
return map(name:name,fn)
func groupBy<K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
return map(name,_file:_file,_line:_line,fn)
.groupByKey()
}
}

0 comments on commit 7cfc6a7

Please sign in to comment.