Skip to content
This repository has been archived by the owner on Aug 13, 2018. It is now read-only.

Permission Denied when writing to HDFS #132

Open
yuriy-davygora opened this issue Jun 28, 2018 · 12 comments
Open

Permission Denied when writing to HDFS #132

yuriy-davygora opened this issue Jun 28, 2018 · 12 comments

Comments

@yuriy-davygora
Copy link

yuriy-davygora commented Jun 28, 2018

I am trying out a basic 'distributed "Hello World" ' job using Dask on a YARN cluster. Basically I am reading some data from HDFS, mapping some columns and then writing them to a different HDFS folder. However, the last step does not work, I receive the following error:

Traceback (most recent call last):
  File "./mgmt_score_dist.py", line 52, in <module>
    main()
  File "./mgmt_score_dist.py", line 47, in main
    'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
  File "./mgmt_score_dist.py", line 33, in mgmt_score_dist
    source_df.to_csv(output, header=False, sep='\t')
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/core.py", line 1091, in to_csv
    return to_csv(self, filename, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 577, in to_csv
    delayed(values).compute(get=get, scheduler=scheduler)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 402, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 2159, in get
    direct=direct)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1562, in gather
    asynchronous=asynchronous)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 652, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 275, in sync
    six.reraise(*error[0])
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 260, in f
    result[0] = yield make_coro()
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/concurrent.py", line 260, in result
    raise_exc_info(self._exc_info)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1439, in _gather
    traceback)
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 438, in _to_csv_chunk
    
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/core.py", line 166, in __enter__
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/hdfs3.py", line 24, in open
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 237, in open
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 709, in __init__
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 719, in _set_handle
IOError: Could not open file: /user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv/0.part, mode: wb Permission denied: user=yarn, access=WRITE, inode="/user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv/0.part":yuriyd:yuriyd:drwxr-xr-x

I have googled for this error, and, apparently, it occurs when I try to write to HDFS as 'yarn' user and not as my own user. I haven't found anything in the documentation or in the source code about setting the user. I tried initializing DaskYARNCluster with user='yuriyd', but I still got the same error.

Any assistance or advice will be greatly appreciated.

Here is my code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import dask.dataframe as dd
from dask.distributed import Client
from dask_yarn import DaskYARNCluster
from json import loads


def load_respondable_sources(input_sources):
    source_df = dd.read_table("%s/*" % input_sources, usecols=[0, 1], header=None, names=['source_id', 'data'],
                              converters={'data': loads})
    source_df['source_respondable'] = source_df['data'].map(lambda x: x[1])
    source_df['response_min_date'] = source_df['data'].map(lambda x: x[2])
    # source_df['response_max_date'] = source_df['data'].map(lambda x: x[3])
    source_df.drop('data', axis=1)
    return source_df.loc[source_df['source_respondable']]


def mgmt_score_dist(
        input_mgmt_response,  # type: str
        input_reviews,  # type: str
        input_sources,  # type: str
        output,  # type: str
        flat_output,  # type: str
):
    cluster = DaskYARNCluster(env='mgmt_score_dist_dask.zip')
    client = Client(cluster)

    cluster.start(2, cpus=2, memory=1024, envvars={'KNIT_LANG': 'C.UTF-8', 'DASK_CONFIG': '/tmp'})

    source_df = load_respondable_sources(input_sources)
    source_df.to_csv(output, header=False, sep='\t')

    # TODO

    client.close()
    cluster.close()


def main():
    mgmt_score_dist(
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/mgmt_response.tsv',
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/dump_reviews.tsv',
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/source.tsv',
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv',
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
    )


if __name__ == '__main__':
    main()

UPDATE: I have temporarily granted every user write permissions (hadoop fs -chmod -R 777 ...), but now I get a different error:

Traceback (most recent call last):
  File "./mgmt_score_dist.py", line 53, in <module>
    main()
  File "./mgmt_score_dist.py", line 48, in main
    'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
  File "./mgmt_score_dist.py", line 34, in mgmt_score_dist
    source_df.to_csv(output, header=False, sep='\t')
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/core.py", line 1091, in to_csv
    return to_csv(self, filename, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 577, in to_csv
    delayed(values).compute(get=get, scheduler=scheduler)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 402, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 2159, in get
    direct=direct)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1562, in gather
    asynchronous=asynchronous)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 652, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 275, in sync
    six.reraise(*error[0])
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 260, in f
    result[0] = yield make_coro()
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/concurrent.py", line 260, in result
    raise_exc_info(self._exc_info)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1439, in _gather
    traceback)
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 439, in _to_csv_chunk
    # finally, compare results against first row and "vote"
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/pandas/core/frame.py", line 1745, in to_csv
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/pandas/io/formats/csvs.py", line 161, in save
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/utils.py", line 136, in __getattr__
AttributeError: 'HDFile' object has no attribute 'getvalue'
@martindurant
Copy link
Member

@jcrist , do containers created by skein have credentials to write to HDFS as the initiating user?

@jcrist
Copy link
Member

jcrist commented Jun 28, 2018

Yes. The delegation token for the default filesystem is provided in each container, and picked up automatically by libhdfs (the backend for pyarrow's hdfs reader), and maybe libhdfs3 (haven't tested). Currently skein doesn't handle delegation token renewal, so this will stop working after it expires, but that should only matter for long running jobs (> 1 day).

@martindurant
Copy link
Member

Thanks @jcrist .
@yuriy-davygora , would you like to try using skein ?

@yuriy-davygora
Copy link
Author

@martindurant

The only reason I tried it with knit was that I wanted to have a quick and easy test, but if it does not work, then I'll try skein.

One more question, though: when I set the permission 777 for everything, I received another error (see the update to my original question). I was using hdf3 at the time. I also tried pyarrow, but it complained about not being able to load libhdfs, and I could not get it to run with libhdfs3. Is this again a knit-related issue, or is this something else?

@jcrist
Copy link
Member

jcrist commented Jun 28, 2018

Oop, actually, upon reading your error message it looks like you're using simple authentication instead of kerberos. In that case, no, skein has the same bug under simple authentication. I've been putting off fixing it in favor of other things, but I'll track that down today. Should be a simple fix.

I also tried pyarrow, but it complained about not being able to load libhdfs

Pyarrow should work fine on any system, but you may need to set some environment variables, see: https://arrow.apache.org/docs/python/filesystems.html#hadoop-file-system-hdfs

@martindurant
Copy link
Member

@TomAugspurger , did something change in pandas to_csv? Calling .getvalue() seems to mean it's assuming the file-like is some BytesIO. This appears like a hdfs3 issue, but I think it showed up in one of the other file-system backends.

@TomAugspurger
Copy link
Member

What version of pandas? 0.23.0 introduced a couple when using compression: pandas-dev/pandas#21144 and pandas-dev/pandas#17778, and I think those fixes introduces another that is being included in 0.23.2.

@martindurant
Copy link
Member

Ah, correct, this seems to be in the ZIP branch (https://github.com/pandas-dev/pandas/blob/master/pandas/io/formats/csvs.py#L176), but the dask file object also handles compression internally. Not sure what to do about that. @yuriy-davygora , you could try passing compression= to to_csv and trying various parameters.

@yuriy-davygora
Copy link
Author

@TomAugspurger: conda installed 0.23.1 automatically, I did not specify pandas version explicitely. I will try 0.23.2 tomorrow.

@jcrist Thank you for your answer, I will give pyarrow another go tomorrow.

@TomAugspurger
Copy link
Member

TomAugspurger commented Jun 28, 2018 via email

@jcrist
Copy link
Member

jcrist commented Jul 3, 2018

@yuriy-davygora, dask-yarn (https://dask-yarn.readthedocs.io/en/latest/) has been released and now uses Skein (https://jcrist.github.io/skein/index.html), a more robust library for python/yarn interaction. The above permissions issue has been addressed there.

As for the to_csv issue, I'm not sure what needs to be done here. This has nothing to do with hadoop stuff necessarily, and is more a bug(?) in dask's bytes handling/to_csv functions. @TomAugspurger, @martindurant any ideas on what if anything needs to be done here?

@TomAugspurger
Copy link
Member

TomAugspurger commented Jul 3, 2018 via email

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants