-
Notifications
You must be signed in to change notification settings - Fork 56
/
trendline_automation.py
203 lines (145 loc) · 6.6 KB
/
trendline_automation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
def check_trend_line(support: bool, pivot: int, slope: float, y: np.array):
# compute sum of differences between line and prices,
# return negative val if invalid
# Find the intercept of the line going through pivot point with given slope
intercept = -slope * pivot + y[pivot]
line_vals = slope * np.arange(len(y)) + intercept
diffs = line_vals - y
# Check to see if the line is valid, return -1 if it is not valid.
if support and diffs.max() > 1e-5:
return -1.0
elif not support and diffs.min() < -1e-5:
return -1.0
# Squared sum of diffs between data and line
err = (diffs ** 2.0).sum()
return err;
def optimize_slope(support: bool, pivot:int , init_slope: float, y: np.array):
# Amount to change slope by. Multiplyed by opt_step
slope_unit = (y.max() - y.min()) / len(y)
# Optmization variables
opt_step = 1.0
min_step = 0.0001
curr_step = opt_step # current step
# Initiate at the slope of the line of best fit
best_slope = init_slope
best_err = check_trend_line(support, pivot, init_slope, y)
assert(best_err >= 0.0) # Shouldn't ever fail with initial slope
get_derivative = True
derivative = None
while curr_step > min_step:
if get_derivative:
# Numerical differentiation, increase slope by very small amount
# to see if error increases/decreases.
# Gives us the direction to change slope.
slope_change = best_slope + slope_unit * min_step
test_err = check_trend_line(support, pivot, slope_change, y)
derivative = test_err - best_err;
# If increasing by a small amount fails,
# try decreasing by a small amount
if test_err < 0.0:
slope_change = best_slope - slope_unit * min_step
test_err = check_trend_line(support, pivot, slope_change, y)
derivative = best_err - test_err
if test_err < 0.0: # Derivative failed, give up
raise Exception("Derivative failed. Check your data. ")
get_derivative = False
if derivative > 0.0: # Increasing slope increased error
test_slope = best_slope - slope_unit * curr_step
else: # Increasing slope decreased error
test_slope = best_slope + slope_unit * curr_step
test_err = check_trend_line(support, pivot, test_slope, y)
if test_err < 0 or test_err >= best_err:
# slope failed/didn't reduce error
curr_step *= 0.5 # Reduce step size
else: # test slope reduced error
best_err = test_err
best_slope = test_slope
get_derivative = True # Recompute derivative
# Optimize done, return best slope and intercept
return (best_slope, -best_slope * pivot + y[pivot])
def fit_trendlines_single(data: np.array):
# find line of best fit (least squared)
# coefs[0] = slope, coefs[1] = intercept
x = np.arange(len(data))
coefs = np.polyfit(x, data, 1)
# Get points of line.
line_points = coefs[0] * x + coefs[1]
# Find upper and lower pivot points
upper_pivot = (data - line_points).argmax()
lower_pivot = (data - line_points).argmin()
# Optimize the slope for both trend lines
support_coefs = optimize_slope(True, lower_pivot, coefs[0], data)
resist_coefs = optimize_slope(False, upper_pivot, coefs[0], data)
return (support_coefs, resist_coefs)
def fit_trendlines_high_low(high: np.array, low: np.array, close: np.array):
x = np.arange(len(close))
coefs = np.polyfit(x, close, 1)
# coefs[0] = slope, coefs[1] = intercept
line_points = coefs[0] * x + coefs[1]
upper_pivot = (high - line_points).argmax()
lower_pivot = (low - line_points).argmin()
support_coefs = optimize_slope(True, lower_pivot, coefs[0], low)
resist_coefs = optimize_slope(False, upper_pivot, coefs[0], high)
return (support_coefs, resist_coefs)
# Load data
data = pd.read_csv('BTCUSDT86400.csv')
data['date'] = data['date'].astype('datetime64[s]')
data = data.set_index('date')
# Take natural log of data to resolve price scaling issues
data = np.log(data)
# Trendline parameter
lookback = 30
support_slope = [np.nan] * len(data)
resist_slope = [np.nan] * len(data)
for i in range(lookback - 1, len(data)):
candles = data.iloc[i - lookback + 1: i + 1]
support_coefs, resist_coefs = fit_trendlines_high_low(candles['high'],
candles['low'],
candles['close'])
support_slope[i] = support_coefs[0]
resist_slope[i] = resist_coefs[0]
data['support_slope'] = support_slope
data['resist_slope'] = resist_slope
plt.style.use('dark_background')
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
data['close'].plot(ax=ax1)
data['support_slope'].plot(ax=ax2, label='Support Slope', color='green')
data['resist_slope'].plot(ax=ax2, label='Resistance Slope', color='red')
plt.title("Trend Line Slopes BTC-USDT Daily")
plt.legend()
plt.show()
'''
# Plot Trendlines on candles
# Library for plotting candles
# pip install mplfinance
import mplfinance as mpf
candles = data.iloc[-30:] # Last 30 candles in data
support_coefs_c, resist_coefs_c = fit_trendlines_single(candles['close'])
support_coefs, resist_coefs = fit_trendlines_high_low(candles['high'], candles['low'], candles['close'])
support_line_c = support_coefs_c[0] * np.arange(len(candles)) + support_coefs_c[1]
resist_line_c = resist_coefs_c[0] * np.arange(len(candles)) + resist_coefs_c[1]
support_line = support_coefs[0] * np.arange(len(candles)) + support_coefs[1]
resist_line = resist_coefs[0] * np.arange(len(candles)) + resist_coefs[1]
plt.style.use('dark_background')
ax = plt.gca()
def get_line_points(candles, line_points):
# Place line points in tuples for matplotlib finance
# https://github.com/matplotlib/mplfinance/blob/master/examples/using_lines.ipynb
idx = candles.index
line_i = len(candles) - len(line_points)
assert(line_i >= 0)
points = []
for i in range(line_i, len(candles)):
points.append((idx[i], line_points[i - line_i]))
return points
s_seq = get_line_points(candles, support_line)
r_seq = get_line_points(candles, resist_line)
s_seq2 = get_line_points(candles, support_line_c)
r_seq2 = get_line_points(candles, resist_line_c)
mpf.plot(candles, alines=dict(alines=[s_seq, r_seq, s_seq2, r_seq2], colors=['w', 'w', 'b', 'b']), type='candle', style='charles', ax=ax)
plt.show()
'''