-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
pipeline_options.py
1899 lines (1718 loc) · 71.9 KB
/
pipeline_options.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Pipeline options obtained from command line parsing."""
# pytype: skip-file
import argparse
import json
import logging
import os
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Type
from typing import TypeVar
import apache_beam as beam
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import ValueProvider
from apache_beam.transforms.display import HasDisplayData
from apache_beam.utils import proto_utils
__all__ = [
'PipelineOptions',
'StandardOptions',
'TypeOptions',
'DirectOptions',
'GoogleCloudOptions',
'AzureOptions',
'HadoopFileSystemOptions',
'WorkerOptions',
'DebugOptions',
'ProfilingOptions',
'SetupOptions',
'TestOptions',
'S3Options'
]
PipelineOptionsT = TypeVar('PipelineOptionsT', bound='PipelineOptions')
_LOGGER = logging.getLogger(__name__)
# Map defined with option names to flag names for boolean options
# that have a destination(dest) in parser.add_argument() different
# from the flag name and whose default value is `None`.
_FLAG_THAT_SETS_FALSE_VALUE = {'use_public_ips': 'no_use_public_ips'}
def _static_value_provider_of(value_type):
"""Helper function to plug a ValueProvider into argparse.
Args:
value_type: the type of the value. Since the type param of argparse's
add_argument will always be ValueProvider, we need to
preserve the type of the actual value.
Returns:
A partially constructed StaticValueProvider in the form of a function.
"""
def _f(value):
_f.__name__ = value_type.__name__
return StaticValueProvider(value_type, value)
return _f
class _BeamArgumentParser(argparse.ArgumentParser):
"""An ArgumentParser that supports ValueProvider options.
Example Usage::
class TemplateUserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--vp_arg1', default='start')
parser.add_value_provider_argument('--vp_arg2')
parser.add_argument('--non_vp_arg')
"""
def add_value_provider_argument(self, *args, **kwargs):
"""ValueProvider arguments can be either of type keyword or positional.
At runtime, even positional arguments will need to be supplied in the
key/value form.
"""
# Extract the option name from positional argument ['pos_arg']
assert args and len(args[0]) >= 1
if args[0][0] != '-':
option_name = args[0]
if kwargs.get('nargs') is None: # make them optionally templated
kwargs['nargs'] = '?'
else:
# or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]
option_name = [i.replace('--', '') for i in args if i[:2] == '--'][0]
# reassign the type to make room for using
# StaticValueProvider as the type for add_argument
value_type = kwargs.get('type') or str
kwargs['type'] = _static_value_provider_of(value_type)
# reassign default to default_value to make room for using
# RuntimeValueProvider as the default for add_argument
default_value = kwargs.get('default')
kwargs['default'] = RuntimeValueProvider(
option_name=option_name,
value_type=value_type,
default_value=default_value)
# have add_argument do most of the work
self.add_argument(*args, **kwargs)
# The argparse package by default tries to autocomplete option names. This
# results in an "ambiguous option" error from argparse when an unknown option
# matching multiple known ones are used. This suppresses that behavior.
def error(self, message):
if message.startswith('ambiguous option: '):
return
super().error(message)
class _DictUnionAction(argparse.Action):
"""
argparse Action take union of json loads values. If a key is specified in more
than one of the values, the last value takes precedence.
"""
def __call__(self, parser, namespace, values, option_string=None):
if not hasattr(namespace,
self.dest) or getattr(namespace, self.dest) is None:
setattr(namespace, self.dest, {})
getattr(namespace, self.dest).update(values)
class PipelineOptions(HasDisplayData):
"""This class and subclasses are used as containers for command line options.
These classes are wrappers over the standard argparse Python module
(see https://docs.python.org/3/library/argparse.html). To define one option
or a group of options, create a subclass from PipelineOptions.
Example Usage::
class XyzOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--abc', default='start')
parser.add_argument('--xyz', default='end')
The arguments for the add_argument() method are exactly the ones
described in the argparse public documentation.
Pipeline objects require an options object during initialization.
This is obtained simply by initializing an options class as defined above.
Example Usage::
p = Pipeline(options=XyzOptions())
if p.options.xyz == 'end':
raise ValueError('Option xyz has an invalid value.')
Instances of PipelineOptions or any of its subclass have access to values
defined by other PipelineOption subclasses (see get_all_options()), and
can be converted to an instance of another PipelineOptions subclass
(see view_as()). All views share the underlying data structure that stores
option key-value pairs.
By default the options classes will use command line arguments to initialize
the options.
"""
def __init__(self, flags: Optional[Sequence[str]] = None, **kwargs) -> None:
"""Initialize an options class.
The initializer will traverse all subclasses, add all their argparse
arguments and then parse the command line specified by flags or by default
the one obtained from sys.argv.
The subclasses of PipelineOptions do not need to redefine __init__.
Args:
flags: An iterable of command line arguments to be used. If not specified
then sys.argv will be used as input for parsing arguments.
**kwargs: Add overrides for arguments passed in flags. For overrides
of arguments, please pass the `option names` instead of
flag names.
Option names: These are defined as dest in the
parser.add_argument() for each flag. Passing flags
like {no_use_public_ips: True}, for which the dest is
defined to a different flag name in the parser,
would be discarded. Instead, pass the dest of
the flag (dest of no_use_public_ips is use_public_ips).
"""
# Initializing logging configuration in case the user did not set it up.
logging.basicConfig()
# self._flags stores a list of not yet parsed arguments, typically,
# command-line flags. This list is shared across different views.
# See: view_as().
if isinstance(flags, str):
# Unfortunately a single str passes the Iterable[str] test, as it is
# an iterable of single characters. This is almost certainly not the
# intent...
raise ValueError(
"Flags must be an iterable of of strings, not a single string.")
self._flags = flags
# Build parser that will parse options recognized by the [sub]class of
# PipelineOptions whose object is being instantiated.
parser = _BeamArgumentParser()
for cls in type(self).mro():
if cls == PipelineOptions:
break
elif '_add_argparse_args' in cls.__dict__:
cls._add_argparse_args(parser) # type: ignore
# The _visible_options attribute will contain options that were recognized
# by the parser.
self._visible_options, _ = parser.parse_known_args(flags)
# self._all_options is initialized with overrides to flag values,
# provided in kwargs, and will store key-value pairs for options recognized
# by current PipelineOptions [sub]class and its views that may be created.
# See: view_as().
# This dictionary is shared across different views, and is lazily updated
# as each new views are created.
# Users access this dictionary store via __getattr__ / __setattr__ methods.
self._all_options = kwargs
# Initialize values of keys defined by this class.
for option_name in self._visible_option_list():
# Note that options specified in kwargs will not be overwritten.
if option_name not in self._all_options:
self._all_options[option_name] = getattr(
self._visible_options, option_name)
def __getstate__(self):
# The impersonate_service_account option must be used only at submission of
# a Beam job. However, Beam IOs might store pipeline options
# within transform implementation that becomes serialized in RunnerAPI,
# causing this option to be inadvertently used at runtime.
# This serialization hook removes it.
if self.view_as(GoogleCloudOptions).impersonate_service_account:
dict_copy = dict(self.__dict__)
dict_copy['_all_options'] = dict(dict_copy['_all_options'])
dict_copy['_all_options']['impersonate_service_account'] = None
return dict_copy
else:
return self.__dict__
@classmethod
def _add_argparse_args(cls, parser: _BeamArgumentParser) -> None:
# Override this in subclasses to provide options.
pass
@classmethod
def from_dictionary(cls, options):
"""Returns a PipelineOptions from a dictionary of arguments.
Args:
options: Dictionary of argument value pairs.
Returns:
A PipelineOptions object representing the given arguments.
"""
flags = []
for k, v in options.items():
# Note: If a boolean flag is True in the dictionary,
# implicitly the method assumes the boolean flag is
# specified as a command line argument. If the
# boolean flag is False, this method simply discards them.
# Eg: {no_auth: True} is similar to python your_file.py --no_auth
# {no_auth: False} is similar to python your_file.py.
if isinstance(v, bool):
if v:
flags.append('--%s' % k)
elif k in _FLAG_THAT_SETS_FALSE_VALUE:
# Capture overriding flags, which have a different dest
# from the flag name defined in the parser.add_argument
# Eg: no_use_public_ips, which has the dest=use_public_ips
# different from flag name
flag_that_disables_the_option = (_FLAG_THAT_SETS_FALSE_VALUE[k])
flags.append('--%s' % flag_that_disables_the_option)
elif isinstance(v, list):
for i in v:
flags.append('--%s=%s' % (k, i))
elif isinstance(v, dict):
flags.append('--%s=%s' % (k, json.dumps(v)))
elif v is None:
# Don't process None type args here, they will be treated
# as strings when parsed by BeamArgumentParser..
logging.warning('Not setting flag with value None: %s', k)
else:
flags.append('--%s=%s' % (k, v))
return cls(flags)
def get_all_options(
self,
drop_default=False,
add_extra_args_fn: Optional[Callable[[_BeamArgumentParser], None]] = None,
retain_unknown_options=False) -> Dict[str, Any]:
"""Returns a dictionary of all defined arguments.
Returns a dictionary of all defined arguments (arguments that are defined in
any subclass of PipelineOptions) into a dictionary.
Args:
drop_default: If set to true, options that are equal to their default
values, are not returned as part of the result dictionary.
add_extra_args_fn: Callback to populate additional arguments, can be used
by runner to supply otherwise unknown args.
retain_unknown_options: If set to true, options not recognized by any
known pipeline options class will still be included in the result. If
set to false, they will be discarded.
Returns:
Dictionary of all args and values.
"""
# TODO(https://github.com/apache/beam/issues/18197): PipelineOption
# sub-classes in the main session might be repeated. Pick last unique
# instance of each subclass to avoid conflicts.
subset = {}
parser = _BeamArgumentParser()
for cls in PipelineOptions.__subclasses__():
subset[str(cls)] = cls
for cls in subset.values():
cls._add_argparse_args(parser) # pylint: disable=protected-access
if add_extra_args_fn:
add_extra_args_fn(parser)
known_args, unknown_args = parser.parse_known_args(self._flags)
if retain_unknown_options:
if unknown_args:
_LOGGER.warning(
'Unknown pipeline options received: %s. Ignore if flags are '
'used for internal purposes.' % (','.join(unknown_args)))
i = 0
while i < len(unknown_args):
# End of argument parsing.
if unknown_args[i] == '--':
break
# Treat all unary flags as booleans, and all binary argument values as
# strings.
if not unknown_args[i].startswith('-'):
i += 1
continue
if i + 1 >= len(unknown_args) or unknown_args[i + 1].startswith('-'):
split = unknown_args[i].split('=', 1)
if len(split) == 1:
parser.add_argument(unknown_args[i], action='store_true')
else:
parser.add_argument(split[0], type=str)
i += 1
elif unknown_args[i].startswith('--'):
parser.add_argument(unknown_args[i], type=str)
i += 2
else:
# skip all binary flags used with '-' and not '--'.
# ex: using -f instead of --f (or --flexrs_goal) will prevent
# argument validation before job submission and can be incorrectly
# submitted to job.
_LOGGER.warning(
"Discarding flag %s, single dash flags are not allowed.",
unknown_args[i])
i += 2
continue
parsed_args, _ = parser.parse_known_args(self._flags)
else:
if unknown_args:
_LOGGER.warning("Discarding unparseable args: %s", unknown_args)
parsed_args = known_args
result = vars(parsed_args)
overrides = self._all_options.copy()
# Apply the overrides if any
for k in list(result):
overrides.pop(k, None)
if k in self._all_options:
result[k] = self._all_options[k]
if (drop_default and parser.get_default(k) == result[k] and
not isinstance(parser.get_default(k), ValueProvider)):
del result[k]
if overrides:
if retain_unknown_options:
result.update(overrides)
else:
_LOGGER.warning("Discarding invalid overrides: %s", overrides)
return result
def to_runner_api(self):
def to_struct_value(o):
if isinstance(o, (bool, int, str)):
return o
elif isinstance(o, (tuple, list)):
return [to_struct_value(e) for e in o]
elif isinstance(o, dict):
return {str(k): to_struct_value(v) for k, v in o.items()}
else:
return str(o) # Best effort.
return proto_utils.pack_Struct(
**{
f'beam:option:{k}:v1': to_struct_value(v)
for (k, v) in self.get_all_options(
drop_default=True, retain_unknown_options=True).items()
if v is not None
})
@classmethod
def from_runner_api(cls, proto_options):
def from_urn(key):
assert key.startswith('beam:option:')
assert key.endswith(':v1')
return key[12:-3]
return cls(
**{from_urn(key): value
for (key, value) in proto_options.items()})
def display_data(self):
return self.get_all_options(drop_default=True, retain_unknown_options=True)
def view_as(self, cls: Type[PipelineOptionsT]) -> PipelineOptionsT:
"""Returns a view of current object as provided PipelineOption subclass.
Example Usage::
options = PipelineOptions(['--runner', 'Direct', '--streaming'])
standard_options = options.view_as(StandardOptions)
if standard_options.streaming:
# ... start a streaming job ...
Note that options objects may have multiple views, and modifications
of values in any view-object will apply to current object and other
view-objects.
Args:
cls: PipelineOptions class or any of its subclasses.
Returns:
An instance of cls that is initialized using options contained in current
object.
"""
view = cls(self._flags)
for option_name in view._visible_option_list():
# Initialize values of keys defined by a cls.
#
# Note that we do initialization only once per key to make sure that
# values in _all_options dict are not-recreated with each new view.
# This is important to make sure that values of multi-options keys are
# backed by the same list across multiple views, and that any overrides of
# pipeline options already stored in _all_options are preserved.
if option_name not in self._all_options:
self._all_options[option_name] = getattr(
view._visible_options, option_name)
# Note that views will still store _all_options of the source object.
view._all_options = self._all_options
return view
def _visible_option_list(self) -> List[str]:
return sorted(
option for option in dir(self._visible_options) if option[0] != '_')
def __dir__(self) -> List[str]:
return sorted(
dir(type(self)) + list(self.__dict__) + self._visible_option_list())
def __getattr__(self, name):
# Special methods which may be accessed before the object is
# fully constructed (e.g. in unpickling).
if name[:2] == name[-2:] == '__':
return object.__getattribute__(self, name)
elif name in self._visible_option_list():
return self._all_options[name]
else:
raise AttributeError(
"'%s' object has no attribute '%s'" % (type(self).__name__, name))
def __setattr__(self, name, value):
if name in ('_flags', '_all_options', '_visible_options'):
super().__setattr__(name, value)
elif name in self._visible_option_list():
self._all_options[name] = value
else:
raise AttributeError(
"'%s' object has no attribute '%s'" % (type(self).__name__, name))
def __str__(self):
return '%s(%s)' % (
type(self).__name__,
', '.join(
'%s=%s' % (option, getattr(self, option))
for option in self._visible_option_list()))
class StandardOptions(PipelineOptions):
DEFAULT_RUNNER = 'DirectRunner'
ALL_KNOWN_RUNNERS = (
'apache_beam.runners.dataflow.dataflow_runner.DataflowRunner',
'apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner',
'apache_beam.runners.direct.direct_runner.DirectRunner',
'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner',
'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
'apache_beam.runners.portability.flink_runner.FlinkRunner',
'apache_beam.runners.portability.portable_runner.PortableRunner',
'apache_beam.runners.portability.prism_runner.PrismRunner',
'apache_beam.runners.portability.spark_runner.SparkRunner',
'apache_beam.runners.test.TestDirectRunner',
'apache_beam.runners.test.TestDataflowRunner',
)
KNOWN_RUNNER_NAMES = [path.split('.')[-1] for path in ALL_KNOWN_RUNNERS]
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--runner',
help=(
'Pipeline runner used to execute the workflow. Valid values are '
'one of %s, or the fully qualified name of a PipelineRunner '
'subclass. If unspecified, defaults to %s.' %
(', '.join(cls.KNOWN_RUNNER_NAMES), cls.DEFAULT_RUNNER)))
# Whether to enable streaming mode.
parser.add_argument(
'--streaming',
default=False,
action='store_true',
help='Whether to enable streaming mode.')
parser.add_argument(
'--resource_hint',
'--resource_hints',
dest='resource_hints',
action='append',
default=[],
help=(
'Resource hint to set in the pipeline execution environment.'
'Hints specified via this option override hints specified '
'at transform level. Interpretation of hints is defined by '
'Beam runners.'))
parser.add_argument(
'--auto_unique_labels',
default=False,
action='store_true',
help='Whether to automatically generate unique transform labels '
'for every transform. The default behavior is to raise an '
'exception if a transform is created with a non-unique label. '
'Using --auto_unique_labels could cause data loss when '
'updating a pipeline or reloading the job state. '
'This is not recommended for streaming jobs.')
parser.add_argument(
'--no_wait_until_finish',
default=False,
action='store_true',
help='By default, the "with" statement waits for the job to '
'complete. Set this flag to bypass this behavior and continue '
'execution immediately')
class StreamingOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--update_compatibility_version',
default=None,
help='Attempt to produce a pipeline compatible with the given prior '
'version of the Beam SDK. '
'See for example, https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')
class CrossLanguageOptions(PipelineOptions):
@staticmethod
def _beam_services_from_enviroment():
return json.loads(os.environ.get('BEAM_SERVICE_OVERRIDES') or '{}')
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--beam_services',
type=lambda s: {
**cls._beam_services_from_enviroment(), **json.loads(s)
},
default=cls._beam_services_from_enviroment(),
help=(
'For convenience, Beam provides the ability to automatically '
'download and start various services (such as expansion services) '
'used at pipeline construction and execution. These services are '
'identified by gradle target. This option provides the ability to '
'use pre-started services or non-default pre-existing artifacts to '
'start the given service. '
'Should be a json mapping of gradle build targets to pre-built '
'artifacts (e.g. jar files) or expansion endpoints '
'(e.g. host:port). Defaults to the value of BEAM_SERVICE_OVERRIDES '
'from the environment.'))
parser.add_argument(
'--use_transform_service',
default=False,
action='store_true',
help='Use the Docker-composed-based transform service when expanding '
'cross-language transforms.')
def additional_option_ptransform_fn():
beam.transforms.ptransform.ptransform_fn_typehints_enabled = True
# Optional type checks that aren't enabled by default.
additional_type_checks: Dict[str, Callable[[], None]] = {
'ptransform_fn': additional_option_ptransform_fn,
}
def enable_all_additional_type_checks():
"""Same as passing --type_check_additional=all."""
for f in additional_type_checks.values():
f()
class TypeOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# TODO(laolu): Add a type inferencing option here once implemented.
parser.add_argument(
'--type_check_strictness',
default='DEFAULT_TO_ANY',
choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
help='The level of exhaustive manual type-hint '
'annotation required')
parser.add_argument(
'--type_check_additional',
default='',
help='Comma separated list of additional type checking features to '
'enable. Options: all, ptransform_fn. For details see:'
'https://beam.apache.org/documentation/sdks/python-type-safety/')
parser.add_argument(
'--no_pipeline_type_check',
dest='pipeline_type_check',
action='store_false',
help='Disable type checking at pipeline construction '
'time')
parser.add_argument(
'--runtime_type_check',
default=False,
action='store_true',
help='Enable type checking at pipeline execution '
'time. NOTE: only supported with the '
'DirectRunner')
parser.add_argument(
'--performance_runtime_type_check',
default=False,
action='store_true',
help='Enable faster type checking via sampling at pipeline execution '
'time. NOTE: only supported with portable runners '
'(including the DirectRunner)')
parser.add_argument(
'--allow_non_deterministic_key_coders',
default=False,
action='store_true',
help='Use non-deterministic coders (such as pickling) for key-grouping '
'operations such as GroupByKey. This is unsafe, as runners may group '
'keys based on their encoded bytes, but is available for backwards '
'compatibility. See BEAM-11719.')
parser.add_argument(
'--allow_unsafe_triggers',
default=False,
action='store_true',
help='Allow the use of unsafe triggers. Unsafe triggers have the '
'potential to cause data loss due to finishing and/or never having '
'their condition met. Some operations, such as GroupByKey, disallow '
'this. This exists for cases where such loss is acceptable and for '
'backwards compatibility. See BEAM-9487.')
def validate(self, unused_validator):
errors = []
if beam.version.__version__ >= '3':
errors.append(
'Update --type_check_additional default to include all '
'available additional checks at Beam 3.0 release time.')
keys = self.type_check_additional.split(',')
for key in keys:
if not key:
continue
elif key == 'all':
enable_all_additional_type_checks()
elif key in additional_type_checks:
additional_type_checks[key]()
else:
errors.append('Unrecognized --type_check_additional feature: %s' % key)
return errors
class DirectOptions(PipelineOptions):
"""DirectRunner-specific execution options."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--no_direct_runner_use_stacked_bundle',
action='store_false',
dest='direct_runner_use_stacked_bundle',
help='DirectRunner uses stacked WindowedValues within a Bundle for '
'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
'avoid it.')
parser.add_argument(
'--direct_runner_bundle_repeat',
type=int,
default=0,
help='replay every bundle this many extra times, for profiling'
'and debugging')
parser.add_argument(
'--direct_num_workers',
type=int,
default=1,
help='number of parallel running workers.')
parser.add_argument(
'--direct_running_mode',
default='in_memory',
choices=['in_memory', 'multi_threading', 'multi_processing'],
help='Workers running environment.')
parser.add_argument(
'--direct_embed_docker_python',
default=False,
action='store_true',
dest='direct_embed_docker_python',
help='DirectRunner uses the embedded Python environment when '
'the default Python docker environment is specified.')
parser.add_argument(
'--direct_test_splits',
default={},
type=json.loads,
help='Split test configuration of the json form '
'{"step_name": {"timings": [...], "fractions": [...]}, ...} '
'where step_name is the name of a step controlling the stage to which '
'splits will be sent, timings is a list of floating-point times '
'(in seconds) at which the split requests will be sent, and '
'fractions is a corresponding list of floating points to use in the '
'split requests themselves.')
class GoogleCloudOptions(PipelineOptions):
"""Google Cloud Dataflow service execution options."""
BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
COMPUTE_API_SERVICE = 'compute.googleapis.com'
STORAGE_API_SERVICE = 'storage.googleapis.com'
DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
OAUTH_SCOPES = [
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/devstorage.full_control',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/datastore',
'https://www.googleapis.com/auth/spanner.admin',
'https://www.googleapis.com/auth/spanner.data'
]
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--dataflow_endpoint',
default=cls.DATAFLOW_ENDPOINT,
help=(
'The URL for the Dataflow API. If not set, the default public URL '
'will be used.'))
# Remote execution must check that this option is not None.
parser.add_argument(
'--project',
default=None,
help='Name of the Cloud project owning the Dataflow '
'job.')
# Remote execution must check that this option is not None.
parser.add_argument(
'--job_name', default=None, help='Name of the Cloud Dataflow job.')
# Remote execution must check that this option is not None.
parser.add_argument(
'--staging_location',
default=None,
help='GCS path for staging code packages needed by '
'workers.')
# Remote execution must check that this option is not None.
# If staging_location is not set, it defaults to temp_location.
parser.add_argument(
'--temp_location',
default=None,
help='GCS path for saving temporary workflow jobs.')
# The Google Compute Engine region for creating Dataflow jobs. See
# https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
# list of valid options.
parser.add_argument(
'--region',
default=None,
help='The Google Compute Engine region for creating '
'Dataflow job.')
parser.add_argument(
'--service_account_email',
default=None,
help='Identity to run virtual machines as.')
parser.add_argument(
'--no_auth',
dest='no_auth',
action='store_true',
default=False,
help='Skips authorizing credentials with Google Cloud.')
# Option to run templated pipelines
parser.add_argument(
'--template_location',
default=None,
help='Save job to specified local or GCS location.')
parser.add_argument(
'--label',
'--labels',
dest='labels',
action='append',
default=None,
help='Labels to be applied to this Dataflow job. '
'Labels are key value pairs separated by = '
'(e.g. --label key=value) or '
'(--labels=\'{ "key": "value", "mass": "1_3kg", "count": "3" }\').')
parser.add_argument(
'--update',
default=False,
action='store_true',
help='Update an existing streaming Cloud Dataflow job. '
'See https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')
parser.add_argument(
'--transform_name_mapping',
default=None,
type=json.loads,
help='The transform mapping that maps the named '
'transforms in your prior pipeline code to names '
'in your replacement pipeline code.'
'See https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')
parser.add_argument(
'--enable_streaming_engine',
default=False,
action='store_true',
help='Enable Windmill Service for this Dataflow job. ')
parser.add_argument(
'--dataflow_kms_key',
default=None,
help='Set a Google Cloud KMS key name to be used in '
'Dataflow state operations (GBK, Streaming).')
parser.add_argument(
'--create_from_snapshot',
default=None,
help='The snapshot from which the job should be created.')
parser.add_argument(
'--flexrs_goal',
default=None,
choices=['COST_OPTIMIZED', 'SPEED_OPTIMIZED'],
help='Set the Flexible Resource Scheduling mode')
parser.add_argument(
'--dataflow_service_option',
'--dataflow_service_options',
dest='dataflow_service_options',
action='append',
default=None,
help=(
'Options to configure the Dataflow service. These '
'options decouple service side feature availability '
'from the Apache Beam release cycle.'
'Note: If set programmatically, must be set as a '
'list of strings'))
parser.add_argument(
'--enable_hot_key_logging',
default=False,
action='store_true',
help='When true, will enable the direct logging of any detected hot '
'keys into Cloud Logging. Warning: this will log the literal key as an '
'unobfuscated string.')
parser.add_argument(
'--enable_artifact_caching',
default=False,
action='store_true',
help='When true, artifacts will be cached across job submissions in '
'the GCS staging bucket')
parser.add_argument(
'--impersonate_service_account',
default=None,
help='All API requests will be made as the given service account or '
'target service account in an impersonation delegation chain '
'instead of the currently selected account. You can specify '
'either a single service account as the impersonator, or a '
'comma-separated list of service accounts to create an '
'impersonation delegation chain.')
parser.add_argument(
'--gcp_oauth_scope',
'--gcp_oauth_scopes',
dest='gcp_oauth_scopes',
action='append',
default=cls.OAUTH_SCOPES,
help=(
'Controls the OAuth scopes that will be requested when creating '
'GCP credentials. Note: If set programmatically, must be set as a '
'list of strings'))
parser.add_argument(
'--enable_bucket_read_metric_counter',
default=False,
action='store_true',
help='Create metrics reporting the approximate number of bytes read '
'per bucket.')
parser.add_argument(
'--enable_bucket_write_metric_counter',
default=False,
action='store_true',
help=
'Create metrics reporting the approximate number of bytes written per '
'bucket.')
parser.add_argument(
'--no_gcsio_throttling_counter',
default=False,
action='store_true',
help='Throttling counter in GcsIO is enabled by default. Set '
'--no_gcsio_throttling_counter to avoid it.')
parser.add_argument(
'--enable_gcsio_blob_generation',
default=False,
action='store_true',
help='Use blob generation when mutating blobs in GCSIO to '
'mitigate race conditions at the cost of more HTTP requests.')
def _create_default_gcs_bucket(self):
try:
from apache_beam.io.gcp import gcsio
except ImportError:
_LOGGER.warning('Unable to create default GCS bucket.')
return None
bucket = gcsio.get_or_create_default_gcs_bucket(self)
if bucket:
return 'gs://%s' % bucket.id
else:
return None
# Log warning if soft delete policy is enabled in a gcs bucket
# that is specified in an argument.
def _warn_if_soft_delete_policy_enabled(self, arg_name):
# skip the check if it is in dry-run mode because the later step requires
# internet connection to access GCS
if self.view_as(TestOptions).dry_run:
return
gcs_path = getattr(self, arg_name, None)
try:
from apache_beam.io.gcp import gcsio
if gcsio.GcsIO().is_soft_delete_enabled(gcs_path):
_LOGGER.warning(
"Bucket specified in %s has soft-delete policy enabled."
" To avoid being billed for unnecessary storage costs, turn"
" off the soft delete feature on buckets that your Dataflow"
" jobs use for temporary and staging storage. For more"
" information, see"
" https://cloud.google.com/storage/docs/use-soft-delete"
"#remove-soft-delete-policy." % arg_name)
except ImportError:
_LOGGER.warning('Unable to check soft delete policy due to import error.')
# If either temp or staging location has an issue, we use the valid one for
# both locations. If both are bad we return an error.
def _handle_temp_and_staging_locations(self, validator):
temp_errors = validator.validate_gcs_path(self, 'temp_location')
staging_errors = validator.validate_gcs_path(self, 'staging_location')
if temp_errors and not staging_errors:
setattr(self, 'temp_location', getattr(self, 'staging_location'))
self._warn_if_soft_delete_policy_enabled('staging_location')
return []
elif staging_errors and not temp_errors:
setattr(self, 'staging_location', getattr(self, 'temp_location'))