Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Crypto dataset from coingecko #733

Merged
merged 12 commits into from
Dec 31, 2021
54 changes: 54 additions & 0 deletions scripts/data_collector/crypto/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Collect Crypto Data

> *Please pay **ATTENTION** that the data is collected from [Coingecko](https://www.coingecko.com/en/api) and the data might not be perfect. We recommend users to prepare their own data if they have high-quality dataset. For more information, users can refer to the [related document](https://qlib.readthedocs.io/en/latest/component/data.html#converting-csv-format-into-qlib-format)*

## Requirements

```bash
pip install -r requirements.txt
```

## Usage of the dataset
> *Crypto dateset only support Data retrieval function but not support backtest function due to the lack of OHLC data.*

## Collector Data


### Crypto Data

#### 1d from Coingecko

```bash

# download from https://api.coingecko.com/api/v3/
python collector.py download_data --source_dir ~/.qlib/crypto_data/source/1d --start 2015-01-01 --end 2021-11-30 --delay 1 --interval 1d

# normalize
python collector.py normalize_data --source_dir ~/.qlib/crypto_data/source/1d --normalize_dir ~/.qlib/crypto_data/source/1d_nor --interval 1d --date_field_name date

# dump data
cd qlib/scripts
python dump_bin.py dump_all --csv_path ~/.qlib/crypto_data/source/1d_nor --qlib_dir ~/.qlib/qlib_data/crypto_data --freq day --date_field_name date --include_fields prices,total_volumes,market_caps

```

### using data

```python
import qlib
from qlib.data import D

qlib.init(provider_uri="~/.qlib/qlib_data/crypto_data")
df = D.features(D.instruments(market="all"), ["$prices", "$total_volumes","$market_caps"], freq="day")
```


### Help
```bash
python collector.py collector_data --help
```

## Parameters

- interval: 1d
- delay: 1
278 changes: 278 additions & 0 deletions scripts/data_collector/crypto/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
import abc
import sys
import datetime
from abc import ABC
from pathlib import Path

import fire
import requests
import pandas as pd
from loguru import logger
from dateutil.tz import tzlocal

CUR_DIR = Path(__file__).resolve().parent
sys.path.append(str(CUR_DIR.parent.parent))
from data_collector.base import BaseCollector, BaseNormalize, BaseRun
from data_collector.utils import get_cg_crypto_symbols

from pycoingecko import CoinGeckoAPI
from time import mktime
from datetime import datetime as dt
import time


class CryptoCollector(BaseCollector):
def __init__(
self,
save_dir: [str, Path],
start=None,
end=None,
interval="1d",
max_workers=1,
max_collector_count=2,
delay=1, # delay need to be one
check_data_length: int = None,
limit_nums: int = None,
):
"""

Parameters
----------
save_dir: str
crypto save dir
max_workers: int
workers, default 4
max_collector_count: int
default 2
delay: float
time.sleep(delay), default 0
interval: str
freq, value from [1min, 1d], default 1min
start: str
start datetime, default None
end: str
end datetime, default None
check_data_length: int
check data length, if not None and greater than 0, each symbol will be considered complete if its data length is greater than or equal to this value, otherwise it will be fetched again, the maximum number of fetches being (max_collector_count). By default None.
limit_nums: int
using for debug, by default None
"""
super(CryptoCollector, self).__init__(
save_dir=save_dir,
start=start,
end=end,
interval=interval,
max_workers=max_workers,
max_collector_count=max_collector_count,
delay=delay,
check_data_length=check_data_length,
limit_nums=limit_nums,
)

self.init_datetime()

def init_datetime(self):
if self.interval == self.INTERVAL_1min:
self.start_datetime = max(self.start_datetime, self.DEFAULT_START_DATETIME_1MIN)
elif self.interval == self.INTERVAL_1d:
pass
else:
raise ValueError(f"interval error: {self.interval}")

self.start_datetime = self.convert_datetime(self.start_datetime, self._timezone)
self.end_datetime = self.convert_datetime(self.end_datetime, self._timezone)

@staticmethod
def convert_datetime(dt: [pd.Timestamp, datetime.date, str], timezone):
try:
dt = pd.Timestamp(dt, tz=timezone).timestamp()
dt = pd.Timestamp(dt, tz=tzlocal(), unit="s")
except ValueError as e:
pass
return dt

@property
@abc.abstractmethod
def _timezone(self):
raise NotImplementedError("rewrite get_timezone")

@staticmethod
def get_data_from_remote(symbol, interval, start, end):
error_msg = f"{symbol}-{interval}-{start}-{end}"
try:
cg = CoinGeckoAPI()
data = cg.get_coin_market_chart_by_id(id=symbol, vs_currency="usd", days="max")
_resp = pd.DataFrame(columns=["date"] + list(data.keys()))
_resp["date"] = [dt.fromtimestamp(mktime(time.localtime(x[0] / 1000))) for x in data["prices"]]
for key in data.keys():
_resp[key] = [x[1] for x in data[key]]
_resp["date"] = pd.to_datetime(_resp["date"])
_resp["date"] = [x.date() for x in _resp["date"]]
_resp = _resp[(_resp["date"] < pd.to_datetime(end).date()) & (_resp["date"] > pd.to_datetime(start).date())]
if _resp.shape[0] != 0:
_resp = _resp.reset_index()
if isinstance(_resp, pd.DataFrame):
return _resp.reset_index()
except Exception as e:
logger.warning(f"{error_msg}:{e}")

def get_data(
self, symbol: str, interval: str, start_datetime: pd.Timestamp, end_datetime: pd.Timestamp
) -> [pd.DataFrame]:
def _get_simple(start_, end_):
self.sleep()
_remote_interval = interval
return self.get_data_from_remote(
symbol,
interval=_remote_interval,
start=start_,
end=end_,
)

if interval == self.INTERVAL_1d:
_result = _get_simple(start_datetime, end_datetime)
else:
raise ValueError(f"cannot support {interval}")
return _result


class CryptoCollector1d(CryptoCollector, ABC):
def get_instrument_list(self):
logger.info("get coingecko crypto symbols......")
symbols = get_cg_crypto_symbols()
logger.info(f"get {len(symbols)} symbols.")
return symbols

def normalize_symbol(self, symbol):
return symbol

@property
def _timezone(self):
return "Asia/Shanghai"


class CryptoNormalize(BaseNormalize):
DAILY_FORMAT = "%Y-%m-%d"

@staticmethod
def normalize_crypto(
df: pd.DataFrame,
calendar_list: list = None,
date_field_name: str = "date",
symbol_field_name: str = "symbol",
):
if df.empty:
return df
df = df.copy()
df.set_index(date_field_name, inplace=True)
df.index = pd.to_datetime(df.index)
df = df[~df.index.duplicated(keep="first")]
if calendar_list is not None:
df = df.reindex(
pd.DataFrame(index=calendar_list)
.loc[
pd.Timestamp(df.index.min()).date() : pd.Timestamp(df.index.max()).date()
+ pd.Timedelta(hours=23, minutes=59)
]
.index
)
df.sort_index(inplace=True)

df.index.names = [date_field_name]
return df.reset_index()

def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
df = self.normalize_crypto(df, self._calendar_list, self._date_field_name, self._symbol_field_name)
return df


class CryptoNormalize1d(CryptoNormalize):
zhupr marked this conversation as resolved.
Show resolved Hide resolved
def _get_calendar_list(self):
return None


class Run(BaseRun):
def __init__(self, source_dir=None, normalize_dir=None, max_workers=1, interval="1d"):
"""

Parameters
----------
source_dir: str
The directory where the raw data collected from the Internet is saved, default "Path(__file__).parent/source"
normalize_dir: str
Directory for normalize data, default "Path(__file__).parent/normalize"
max_workers: int
Concurrent number, default is 1
interval: str
freq, value from [1min, 1d], default 1d
"""
super().__init__(source_dir, normalize_dir, max_workers, interval)

@property
def collector_class_name(self):
return f"CryptoCollector{self.interval}"

@property
def normalize_class_name(self):
return f"CryptoNormalize{self.interval}"

@property
def default_base_dir(self) -> [Path, str]:
return CUR_DIR

def download_data(
self,
max_collector_count=2,
delay=0,
start=None,
end=None,
interval="1d",
check_data_length: int = None,
limit_nums=None,
):
"""download data from Internet

Parameters
----------
max_collector_count: int
default 2
delay: float
time.sleep(delay), default 0
interval: str
freq, value from [1min, 1d], default 1d, currently only supprot 1d
start: str
start datetime, default "2000-01-01"
end: str
end datetime, default ``pd.Timestamp(datetime.datetime.now() + pd.Timedelta(days=1))``
check_data_length: int # if this param useful?
check data length, if not None and greater than 0, each symbol will be considered complete if its data length is greater than or equal to this value, otherwise it will be fetched again, the maximum number of fetches being (max_collector_count). By default None.
limit_nums: int
using for debug, by default None

Examples
---------
# get daily data
$ python collector.py download_data --source_dir ~/.qlib/crypto_data/source/1d --start 2015-01-01 --end 2021-11-30 --delay 1 --interval 1d
"""

super(Run, self).download_data(max_collector_count, delay, start, end, interval, check_data_length, limit_nums)

def normalize_data(self, date_field_name: str = "date", symbol_field_name: str = "symbol"):
"""normalize data

Parameters
----------
date_field_name: str
date field name, default date
symbol_field_name: str
symbol field name, default symbol

Examples
---------
$ python collector.py normalize_data --source_dir ~/.qlib/crypto_data/source/1d --normalize_dir ~/.qlib/crypto_data/source/1d_nor --interval 1d --date_field_name date
"""
super(Run, self).normalize_data(date_field_name, symbol_field_name)


if __name__ == "__main__":
fire.Fire(Run)
8 changes: 8 additions & 0 deletions scripts/data_collector/crypto/requirement.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
loguru
fire
requests
numpy
pandas
tqdm
lxml
pycoingecko
33 changes: 33 additions & 0 deletions scripts/data_collector/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from tqdm import tqdm
from functools import partial
from concurrent.futures import ProcessPoolExecutor
from pycoingecko import CoinGeckoAPI

HS_SYMBOLS_URL = "http://app.finance.ifeng.com/hq/list.php?type=stock_a&class={s_type}"

Expand All @@ -42,6 +43,7 @@
_US_SYMBOLS = None
_IN_SYMBOLS = None
_EN_FUND_SYMBOLS = None
_CG_CRYPTO_SYMBOLS = None
_CALENDAR_MAP = {}

# NOTE: Until 2020-10-20 20:00:00
Expand Down Expand Up @@ -377,6 +379,37 @@ def _get_eastmoney():
return _EN_FUND_SYMBOLS


def get_cg_crypto_symbols(qlib_data_path: [str, Path] = None) -> list:
"""get crypto symbols in coingecko

Returns
-------
crypto symbols in given exchanges list of coingecko
"""
global _CG_CRYPTO_SYMBOLS

@deco_retry
def _get_coingecko():
try:
cg = CoinGeckoAPI()
resp = pd.DataFrame(cg.get_coins_markets(vs_currency="usd"))
except:
raise ValueError("request error")
try:
_symbols = resp["id"].to_list()
except Exception as e:
logger.warning(f"request error: {e}")
raise
return _symbols

if _CG_CRYPTO_SYMBOLS is None:
_all_symbols = _get_coingecko()

_CG_CRYPTO_SYMBOLS = sorted(set(_all_symbols))

return _CG_CRYPTO_SYMBOLS


def symbol_suffix_to_prefix(symbol: str, capital: bool = True) -> str:
"""symbol suffix to prefix

Expand Down