diff --git a/sdks/swift/README.md b/sdks/swift/README.md index 810ccae8b24f..d48def0f4812 100644 --- a/sdks/swift/README.md +++ b/sdks/swift/README.md @@ -16,4 +16,10 @@ let package = Package( ) ``` +## Writing a Pipeline + +``` +import ApacheBeam + + diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift index 0b97a084ccf2..9be08330b469 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift @@ -20,8 +20,9 @@ public extension PCollection { /// Each time the input fires output all of the values in this list. - func create(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection { - return pardo(name,values) { values,input,output in + func create(_ values: [Value],_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection { + + return pardo(name,_file:_file,_line:_line,values) { values,input,output in for try await (_,ts,w) in input { for v in values { output.emit(v,timestamp:ts,window:w) @@ -35,8 +36,8 @@ public extension PCollection { public extension PCollection { @discardableResult - func log(prefix:String,name:String = "\(#file):\(#line)") -> PCollection where Of == String { - pardo(name,prefix) { prefix,input,output in + func log(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection where Of == String { + pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in for await element in input { print("\(prefix): \(element)") output.emit(element) @@ -45,8 +46,8 @@ public extension PCollection { } @discardableResult - func log(prefix:String,name:String = "\(#file):\(#line)") -> PCollection> where Of == KV { - pardo(name,prefix) { prefix,input,output in + func log(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection> where Of == KV { + pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in for await element in input { let kv = element.0 for v in kv.values { @@ -62,16 +63,16 @@ public extension PCollection { public extension PCollection { /// Modify a value without changing its window or timestamp - func map(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> Out) -> PCollection { - return pardo(name) { input,output in + func map(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> Out) -> PCollection { + return pardo(name,_file:_file,_line:_line) { input,output in for try await (v,ts,w) in input { output.emit(fn(v),timestamp:ts,window:w) } } } - func map(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection> { - return pardo(name) { input,output in + func map(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection> { + return pardo(name,_file:_file,_line:_line) { input,output in for try await (i,ts,w) in input { let (key,value) = fn(i) output.emit(KV(key,value),timestamp:ts,window:w) @@ -80,8 +81,8 @@ public extension PCollection { } /// Produce multiple outputs as a single value without modifying window or timestamp - func flatMap(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection { - return pardo(name) { input,output in + func flatMap(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection { + return pardo(name,_file:_file,_line:_line) { input,output in for try await (v,ts,w) in input { for i in fn(v) { output.emit(i,timestamp:ts,window:w) @@ -94,7 +95,7 @@ public extension PCollection { public extension PCollection { /// Convenience function to add an impulse when we are at the root of the pipeline - func create(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection { - return impulse().create(values,name) + func create(_ values: [Value],_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection { + return impulse().create(values,name,_file:_file,_line:_line) } } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift index 3684b2390506..4cc3532da210 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift @@ -33,68 +33,68 @@ public extension PCollection { // TODO: Replace with parameter pack version once https://github.com/apple/swift/issues/67192 is resolved // No Output - func pardo(_ name: String = "\(#file):\(#line)",_ fn: F) { - self.apply(.pardo(name, fn, [])) + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F) { + self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [])) } - func pardo(_ name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (PCollection.Stream) async throws -> Void) { - self.apply(.pardo(name, ClosureFn(fn),[])) + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection.Stream) async throws -> Void) { + self.apply(.pardo(name ?? "\(_file):\(_line)", ClosureFn(fn),[])) } - func pardo(_ name: String = "\(#file):\(#line)",_ param:Param,_ fn: @Sendable @escaping (Param,PCollection.Stream) async throws -> Void) { - self.apply(.pardo(name, ParameterizedClosureFn(param,fn), [])) + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection.Stream) async throws -> Void) { + self.apply(.pardo(name ?? "\(_file):\(_line)", ParameterizedClosureFn(param,fn), [])) } // Single Output - func pardo(_ name: String = "\(#file):\(#line)",_ fn: F, + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection) { - self.apply(.pardo(name, fn, [AnyPCollection(o0)])) + self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0)])) } - func pardo(_ name: String = "\(#file):\(#line)", + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection.Stream,PCollection.Stream) async throws -> Void) -> (PCollection) { let output = PCollection() - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)])) return output } - func pardo(_ name: String = "\(#file):\(#line)",_ param: Param, + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection.Stream,PCollection.Stream) async throws -> Void) -> (PCollection) { let output = PCollection() - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) return output } // Two Outputs - func pardo(_ name: String = "\(#file):\(#line)",_ fn: F, + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection,_ o1:PCollection) { - self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)])) + self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1)])) } - func pardo(_ name: String = "\(#file):\(#line)", + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection.Stream,PCollection.Stream,PCollection.Stream) async throws -> Void) -> (PCollection,PCollection) { let output = (PCollection(),PCollection()) - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) return output } - func pardo(_ name: String = "\(#file):\(#line)",_ param: Param, + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection.Stream,PCollection.Stream,PCollection.Stream) async throws -> Void) -> (PCollection,PCollection) { let output = (PCollection(),PCollection()) - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) return output } // Three Outputs - func pardo(_ name: String = "\(#file):\(#line)",_ fn: F, + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection,_ o1:PCollection,_ o2:PCollection) { - self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) + self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) } - func pardo(_ name: String = "\(#file):\(#line)", + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection.Stream,PCollection.Stream,PCollection.Stream,PCollection.Stream) async throws -> Void) -> (PCollection,PCollection,PCollection) { let output = (PCollection(),PCollection(),PCollection()) - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) return output } - func pardo(_ name: String = "\(#file):\(#line)",_ param: Param, + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection.Stream,PCollection.Stream,PCollection.Stream,PCollection.Stream) async throws -> Void) -> (PCollection,PCollection,PCollection) { let output = (PCollection(),PCollection(),PCollection()) - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) return output } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift index 642e76548ad1..3ef57ebfd35c 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift @@ -18,8 +18,8 @@ /// Basic reducers public extension PCollection { - func reduce(name:String = "\(#file):\(#line)",into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection> where Of == KV { - return pardo(name,into) { initialValue,input,output in + func reduce(name:String? = nil,_file:String=#fileID,_line:Int=#line,into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection> where Of == KV { + return pardo(name,_file:_file,_line:_line,into) { initialValue,input,output in for await (kv,ts,w) in input { var result = initialValue for v in kv.values { @@ -34,7 +34,7 @@ public extension PCollection { /// Convenience functions public extension PCollection { - func sum() -> PCollection> where Of == KV { - return reduce(into: 0,{ a,b in b = b + a }) + func sum(_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection> where Of == KV { + return reduce(name:name,_file:_file,_line:_line,into: 0,{ a,b in b = b + a }) } } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift index ed98a8577e6e..8cf2b46cfeab 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift @@ -19,8 +19,8 @@ /// Basic grouping functionality /// public extension PCollection { - func groupBy(name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection> { - return map(name:name,fn) + func groupBy(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection> { + return map(name,_file:_file,_line:_line,fn) .groupByKey() } }