Module areixio.utils.indicator
Classes
class BarGenerator (on_bar: Callable = None, window: int = 0, on_window_bar: Callable = None, hour_window: bool = False, strict_agg: bool = False)
-
- generating 1 minute bar data from tick data
- generating x minute bar/x hour bar data from 1 minute data
Args
on_bar
:Callable
, optional- callback function when new bar generated. Defaults to None.
window
:int
, optional- window. Defaults to 0. if hour_window is False, this is for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30 if hour_window is True, this is for x hour bar, x can be any number
on_window_bar
:Callable
, optional- callback function when new window bar generated. Defaults to None.
hour_window
:bool
, optional- if is generator is for genrating hour window abr. Defaults to False.
Class variables
var hour_window_map
Methods
def update_bar(self, bar: Union[dict, pandas.core.series.Series]) ‑> None
-
Update new bar data into array manager.
def update_bar_hour_window(self, bar) ‑> None
-
- 1 minute bar to 1 hour bar
- 1 minute bar to 4 hour bar
- 15 minute bar to 1 hour bar
- 15 minute bar to 4 hour bar
- 1 hour bar to 1 hour bar
- 1 hour bar to 4 hour bar
- 2 hour bar to 4 hour bar
1: 0, 1, 2, 3, 4, 5, 6, 7, .... 22, 23 2: 0, 2, 4, 6, 8, .... 20, 22 4: 0, 4, 8, 12, 16, 20 6: 2, 8, 14, 20 8: 0, 8, 16 12: 0, 12
def update_bar_minute_window(self, bar) ‑> None
def update_tick(self, tick: Union[dict, pandas.core.series.Series]) ‑> None
-
Update new tick data into generator.
class Indicator (size: int = 100, window: int = 1, hour_window: bool = False, strict_agg: bool = False, datafeed=None)
-
Args
size
:int
, optional- store size. Defaults to 100.
window
:int
, optional- time window. Defaults to 1.
hour_window
:bool
, optional- if True, bar will be hour bar, otherwise, it is minute bar. Defaults to False.
strict_agg
:bool
, optional- mainly for 4h & 6h bar data, if true, the aggreagted datetime will be same as exchange. Defaults to False.
datafeed
:_type_
, optional- if specified, will automatically download the
size
data so that the indicator will be successfully initialized. Defaults to None.
Raises
ImportError
- if ta-lib is not installed
Instance variables
var close : numpy.ndarray
-
Get close price time series.
var high : numpy.ndarray
-
Get high price time series.
var low : numpy.ndarray
-
Get low price time series.
var open : numpy.ndarray
-
Get open price time series.
var ticks : numpy.ndarray
-
Get datetime time series.
var turnover : numpy.ndarray
-
Get trading turnover time series.
var volume : numpy.ndarray
-
Get trading volume time series.
var zeros : numpy.ndarray
Methods
def ad(self, array: bool = False) ‑> Union[float, numpy.ndarray]
-
Chaikin A/D Line
def adosc(self, fast_period: int, slow_period: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
Chaikin A/D Oscillator AKA Chaikin Money Flow
def adx(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
ADX.
def adx1(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
ADX.
TrueRange = max(max(high-low, abs(high-nz(close[1]))), abs(low-nz(close[1]))) DirectionalMovementPlus = high-nz(high[1]) > nz(low[1])-low ? max(high-nz(high[1]), 0): 0 DirectionalMovementMinus = nz(low[1])-low > high-nz(high[1]) ? max(nz(low[1])-low, 0): 0 SmoothedTrueRange = 0.0 SmoothedTrueRange[i] = nz(SmoothedTrueRange[1]) - (nz(SmoothedTrueRange[1])/len) + TrueRange SmoothedDirectionalMovementPlus = 0.0 SmoothedDirectionalMovementPlus[i] = nz(SmoothedDirectionalMovementPlus[1]) - (nz(SmoothedDirectionalMovementPlus[1])/len) + DirectionalMovementPlus SmoothedDirectionalMovementMinus = 0.0 SmoothedDirectionalMovementMinus[i] = nz(SmoothedDirectionalMovementMinus[1]) - (nz(SmoothedDirectionalMovementMinus[1])/len) + DirectionalMovementMinus DIPlus = SmoothedDirectionalMovementPlus / SmoothedTrueRange * 100 DIMinus = SmoothedDirectionalMovementMinus / SmoothedTrueRange * 100 DX = abs(DIPlus-DIMinus) / (DIPlus+DIMinus)*100 ADX = sma(DX, len)
def adxr(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
ADXR.
def ahr999(self) ‑> Union[float, numpy.ndarray]
-
ahr999 指数 = (比特币价格 /200 日定投成本) * (比特币价格 / 指数增长估值) - 200日定投成本 = 过去200日的比特币价格的均值 - 指数增长估值 = 10^[5.84 * log(币龄) - 17.01] - 币龄 = 当前距离2009年1月3日(该日期被认为是btc诞生日)的天数 (timestamp: 1230940800)
当ahr999<0.45时适合抄底, 0.45<ahr999<1.2时适合定投BTC, ahr999>1.2时不是良好的投资btc的时机
def alligator(self, jaw_length: int = 13, teeth_length: int = 8, lips_length: int = 5, jaw_offset: int = 8, teeth_offset: int = 5, lips_offset: int = 3, array: bool = False, input: numpy.ndarray = None)
def apo(self, fast_period: int, slow_period: int, matype: int = 0, array: bool = False) ‑> Union[float, numpy.ndarray]
-
APO.
def aroon(self, n: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]
-
Aroon indicator.
def aroonosc(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
Aroon Oscillator.
def atr(self, n: int, multiplier: float = 1, array: bool = False, matype: int = 0) ‑> Union[float, numpy.ndarray]
-
Average True Range (ATR).
def av2(self, ma_period: int, ma_period_smoothing: float, ma_type: int = 0, array: bool = False, input: numpy.ndarray = None)
-
Smoothed Heikin Ashi Cloud A-V2: https://www.tradingview.com/script/DTDQ3y76/ B-V2: https://www.tradingview.com/script/acmdQs2A/
an overlay showing “Smoothed Heikin Ashi” and measure the momentum of the trend and identify potential trend rejections.
def avg(self, *args) ‑> float
-
avg
def bband(self, n: int, dev: float, matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]
-
Bollinger Channel.
def boll(self, n: int, dev: float, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]
-
Bollinger Channel.
def bop(self, array: bool = False) ‑> Union[float, numpy.ndarray]
-
BOP.
def cci(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
Commodity Channel Index (CCI).
def chandelier_exit(self, atr_length: int, atr_multiplier: float, array: bool = False, input: numpy.ndarray = None)
-
Chandelier Exit
longStop = (useClose ? highest(close, length) : highest(length)) - atr longStopPrev = nz(longStop[1], longStop) longStop[i] = close[1] > longStopPrev ? max(longStop, longStopPrev) : longStop
shortStop = (useClose ? lowest(close, length) : lowest(length)) + atr shortStopPrev = nz(shortStop[1], shortStop) shortStop[i] = close[1] < shortStopPrev ? min(shortStop, shortStopPrev) : shortStop
var int dir = 1 dir[i] = close > shortStopPrev ? 1 : close < longStopPrev ? -1 : dir
buySignal = dir == 1 and dir[1] == -1 sellSignal = dir == -1 and dir[1] == 1
buypx = buySignal ? longStop : na sellpx = sellSignal ? shortStop : na
def change(self, source: numpy.ndarray) ‑> float
def cmo(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
CMO.
def crossover(self, source1: numpy.ndarray, source2: numpy.ndarray) ‑> bool
def crossunder(self, source1: numpy.ndarray, source2: numpy.ndarray) ‑> bool
def custom(self, func: Callable, array: bool = False, *args, **kwargs) ‑> Union[float, numpy.ndarray]
-
Custom func
Note
all the inputs need to be the same shape the sequence of inputs need to follow the func params
def dema(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
Double EMA
def donchian(self, n: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]
-
Donchian Channel.
def dx(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
DX.
def ema(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
Exponential moving average.
def ha(self, is_simple: bool = False, array: bool = False, input_open: numpy.ndarray = None, input_high: numpy.ndarray = None, input_low: numpy.ndarray = None, input_close: numpy.ndarray = None) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float, float]]
-
Heiken Ashi Candles (HA)
def halftrend(self, amplitude: int = 4, deviation: int = 2, array: bool = False, input: numpy.ndarray = None)
def hama(self, ma_open_period: int = 25, ma_high_period: int = 20, ma_low_period: int = 20, ma_close_period: int = 20, ma_type: int = 0, array: bool = False, input: numpy.ndarray = None)
-
Heiken-Ashi Moving Average
https://www.tradingview.com/script/k7nrF2oI-NSDT-HAMA-Candles/
def heatmap_volume(self, ma_length: int = 610, std_length: int = 610, extra_high_threshold: float = 4, high_threshold: float = 2.5, medium_threshold: float = 1, normal_threshold: float = -0.5, array: bool = False, input: numpy.ndarray = None)
-
extra_high high medium normal low
length := length > bar_index + 1 ? bar_index + 1 : length slength := slength > bar_index + 1 ? bar_index + 1 : slength
pstdev(Series, Period) => mean = sum(Series, Period) / Period summation = 0.0 for i=0 to Period-1 sampleMinusMean = nz(Series[i]) - mean summation := summation + sampleMinusMean * sampleMinusMean return = sqrt(summation / Period)
mean = sma(volume, length) std = pstdev(volume, slength) stdbar = (volume - mean) / std dir = close > open v = osc ? volume - mean : volume mosc = osc ? 0 : mean
bcolor = stdbar > thresholdExtraHigh ? dir ? cthresholdExtraHighUp : cthresholdExtraHighDn : stdbar > thresholdHigh ? dir ? cthresholdHighUp : cthresholdHighDn : stdbar > thresholdMedium ? dir ? cthresholdMediumUp : cthresholdMediumDn : stdbar > thresholdNormal ? dir ? cthresholdNormalUp : cthresholdNormalDn : dir ? cthresholdLowUp : cthresholdLowDn
def highest(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
highest.
def hl2(self, array: bool = False) ‑> Union[float, numpy.ndarray]
-
(high + low ) / 2
def hlc3(self, array: bool = False) ‑> Union[float, numpy.ndarray]
-
(high + low + close) / 3
def hma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
HMA - Hull ma. HMA= WMA(2*WMA(n/2) - WMA(n)),sqrt(n))
def ichimoku(self, tenkan_sen_period: int = 9, kijun_sen_period: int = 26, senkou_span_period: int = 52, chikou_span_period: int = 26, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float, float, float]]
-
Ichimoku Cloud (Ichimoku)
def ichimoku1(self, tenkan_sen_period: int = 9, kijun_sen_period: int = 26, senkou_span_period: int = 52, chikou_span_period: int = 26, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float, float, float]]
-
Ichimoku Cloud (Ichimoku)
def impulse_macd(self, ma_period: int = 34, signal_period: int = 9, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]
-
Impulse Macd
def kama(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
KAMA.
def kdj(self, n: int, smooth_k: int, smooth_d: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]
-
Stochastic Relative Strenght Index
def kdj0(self, n: int, smooth_k: int, smooth_d: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]
-
Stochastic Relative Strenght Index
def kdj1(self, n: int, fastk_period: int, fastd_period: int, fastd_matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]
-
KDJ
def keltner(self, n: int, dev: float, atr_length: int = None, matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]
-
Keltner Channel.
def lowest(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
lowest.
def macd(self, fast_period: int = 13, slow_period: int = 34, signal_period: int = 9, matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]
-
MACD. 0-SMA; 1-EMA
def macd_divergence(self, fast_period: int = 13, slow_period: int = 34, signal_period: int = 9, matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]
def max(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
max.
def maximum(self, input1: numpy.ndarray, input2: numpy.ndarray, array: bool = False) ‑> Union[float, numpy.ndarray]
-
maximum.
def mfi(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
Money Flow Index.
def min(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
min.
def minimum(self, input1: numpy.ndarray, input2: numpy.ndarray, array: bool = False) ‑> Union[float, numpy.ndarray]
-
minimum.
def minus_di(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
MINUS_DI.
def minus_dm(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
MINUS_DM.
def mom(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
MOM.
def na(self, source: float) ‑> bool
def natr(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
NATR.
def nz(self, source1: float, source2: float) ‑> float
def obv(self, array: bool = False) ‑> Union[float, numpy.ndarray]
-
On Balance Volume
def pine_sar(self, start: float = 0.02, inc: float = 0.02, maximum: float = 0.2)
def pivothigh(self, left, right=0)
-
https://blog.csdn.net/sumubaiblog/article/details/122095719 通道裡面的最低點跟最高點 在一段震盪行情的高點與低點, RSI 背離
def pivotlow(self, left, right=0)
def plus_di(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
PLUS_DI.
def plus_dm(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
PLUS_DM.
def ppo(self, fast_period: int, slow_period: int, matype: int = 0, array: bool = False) ‑> Union[float, numpy.ndarray]
-
PPO.
def qqe(self, rsi_period: int, rsi_smoothing: int, qqe_factor: int, array: bool = False, input: numpy.ndarray = None)
-
Quantitative Qualitative Estimation (QQE)
The Quantitative Qualitative Estimation (QQE) is similar to SuperTrend but uses a Smoothed RSI with an upper and lower bands. The band width is a combination of a one period True Range of the Smoothed RSI which is double smoothed using Wilder's smoothing length (2 * rsiLength - 1) and multiplied by the default factor of 4.236. A Long trend is determined when the Smoothed RSI crosses the previous upperband and a Short trend when the Smoothed RSI crosses the previous lowerband.
Based on QQE.mq5 by EarnForex Copyright © 2010, based on version by Tim Hyder (2008), based on version by Roman Ignatov (2006)
Sources
https://www.tradingview.com/script/IYfA9R2k-QQE-MT4/ https://www.tradingpedia.com/forex-trading-indicators/quantitative-qualitative-estimation https://www.prorealcode.com/prorealtime-indicators/qqe-quantitative-qualitative-estimation/
Calculation
Default Inputs: length=14, smooth=5, factor=4.236, mamode="ema", drift=1
Args
close
:pd.Series
- Series of 'close's
length
:int
- RSI period. Default: 14
smooth
:int
- RSI smoothing period. Default: 5
factor
:float
- QQE Factor. Default: 4.236
mamode
:str
- See
help(ta.ma)
. Default: 'sma' drift
:int
- The difference period. Default: 1
offset
:int
- How many periods to offset the result. Default: 0
Kwargs
fillna (value, optional): pd.DataFrame.fillna(value) fill_method (value, optional): Type of fill method
Returns
pd.DataFrame
- QQE, RSI_MA (basis), QQEl (long), and QQEs (short) columns.
qqe_df = cal_qqe(pd.Series(indicator.close), length=self.rsi_length, smooth=self.rsi_smoothing, factor=self.fast_qqe_factor, ).fillna(0.0)
qqe_df = qqe_df if qqe_df is None else qqe_df.fillna(0.0)
print(qqe_df,'????')
qqe_df = cal_qqe(hist.close, )
qqe_long = qqe_df[f"QQEl_{self.rsi_length}{self.rsi_smoothing}"].iloc[-1] qqe_long = qqe_long - 50 if qqe_long else qqe_long qqe_short = qqe_df[f"QQEs_{self.rsi_length}{self.rsi_smoothing}"].iloc[-1] qqe_short = qqe_short - 50 if qqe_short else qqe_short
def rh_inventory_retracement_bar(self, z: int = 45, array: bool = False, input: numpy.ndarray = None)
-
https://cn.tradingview.com/script/VNByUGpE-Rob-Hoffman-s-Inventory-Retracement-Bar-by-UCSgears/
z: Inventory Retracement Percentage %
def rh_overlay_set(self, array: bool = False, input: numpy.ndarray = None)
-
https://cn.tradingview.com/script/al4ABhwU-Rob-Hoffman-Overlay-Set/
plot(a, title = "Fast Speed Line", linewidth = 2, color = #0000FF) plot(b, title = "Slow Speed Line", linewidth = 2, color = fuchsia) plot(c, title = "Fast Primary Trend Line", linewidth = 3, color = #00FF00) plot(d, title = "Slow Primary Trend Line", linewidth = 3, color = #000000) plot(e, title = "Trend Line - 1", linewidth = 3, color = #0000FF, style = circles) plot(f, title = "Trend Line - 2", linewidth = 3, color = #20B2AA) plot(g, title = "Trend Line - 3", linewidth = 3, color = #FF4500) plot(h, title = "Trend Line - 4", linewidth = 3, color = fuchsia)
plot(k, title = "No Trend Zone - Midline", linewidth = 2, color = #3CB371) plot(ku, title = "No Trend Zone - Upperline", linewidth = 2, color = #3CB371) plot(kl, title = "No Trend Zone - Lowerline", linewidth = 2, color = #3CB371)
def rma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
wildeR's Moving Average (RMA)
def roc(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
ROC.
def rocp(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
ROCP.
def rocr(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
ROCR.
def rocr_100(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
ROCR100.
def rsi(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
Relative Strenght Index (RSI).
def sar(self, acceleration: int = 0, maximum: int = 0, array: bool = False) ‑> Union[float, numpy.ndarray]
-
SAR.
start: 0.02 increament: 0.02 maximum: 0.2
def sarext(self, startvalue: int = 0, offsetonreverse: int = 0, accelerationinitlong: int = 0, accelerationlong: int = 0, accelerationmaxlong: int = 0, accelerationinitshort: int = 0, accelerationshort: int = 0, accelerationmaxshort: int = 0, array: bool = False) ‑> Union[float, numpy.ndarray]
-
SAR Extended.
start: 0.02 increament: 0.02 maximum: 0.2
def shift(self, n: int, input: numpy.ndarray, fill_value=nan)
-
shift data
def sma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
Simple moving average.
def smma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
Smoothed moving average.
def ssl_channel(self, ma_high_period: int, ma_low_period: int, array: bool = False, input: numpy.ndarray = None)
-
SSL Channel
def stc(self, period: int = 12, fast_period: int = 26, slow_period: int = 50, multiplier: float = 0.5, array: bool = False, input: numpy.ndarray = None)
-
https://cn.tradingview.com/script/WhRRThMI-STC-Indicator-A-Better-MACD-SHK/
AAA = input(0.5) var CCCCC = 0.0 var DDD = 0.0 var DDDDDD = 0.0 var mAAAAA = 0.0
fastMA = ta.ema(close, BBBB) slowMA = ta.ema(close, BBBBB) BBBBBB = fastMA - slowMA
CCC = ta.lowest(BBBBBB, EEEEEE) CCCC = ta.highest(BBBBBB, EEEEEE) - CCC CCCCC := CCCC > 0 ? (BBBBBB - CCC) / CCCC * 100 : nz(CCCCC[1]) DDD := na(DDD[1]) ? CCCCC : DDD[1] + AAA * (CCCCC - DDD[1]) DDDD = ta.lowest(DDD, EEEEEE) DDDDD = ta.highest(DDD, EEEEEE) - DDDD DDDDDD := DDDDD > 0 ? (DDD - DDDD) / DDDDD * 100 : nz(DDDDDD[1]) mAAAAA := na(mAAAAA[1]) ? DDDDDD : mAAAAA[1] + AAA * (DDDDDD - mAAAAA[1])
mColor = mAAAAA > mAAAAA[1] ? color.new(color.green, 20) : color.new(color.red, 20)
def stc_chatgpt(self, period: int = 12, fast_period: int = 26, slow_period: int = 50, multiplier: float = 0.5, array: bool = False, input: numpy.ndarray = None)
def std(self, n: int, nbdev: int = 1, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
Standard deviation.
def stoch(self, fastk_period: int, slowk_period: int, slowk_matype: int, slowd_period: int, slowd_matype: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]
-
Stochastic Indicator
def sum(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
Summation.
def supertrend(self, period, multiplier: int = 1, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, str]]
-
SuperTrend
SuperTrend Algorithm :
BASIC UPPERBAND = (HIGH + LOW) / 2 + Multiplier * ATR BASIC LOWERBAND = (HIGH + LOW) / 2 - Multiplier * ATR FINAL UPPERBAND = IF( (Current BASICUPPERBAND < Previous FINAL UPPERBAND) or (Previous Close > Previous FINAL UPPERBAND)) THEN (Current BASIC UPPERBAND) ELSE Previous FINALUPPERBAND) FINAL LOWERBAND = IF( (Current BASIC LOWERBAND > Previous FINAL LOWERBAND) or (Previous Close < Previous FINAL LOWERBAND)) THEN (Current BASIC LOWERBAND) ELSE Previous FINAL LOWERBAND) SUPERTREND = IF((Previous SUPERTREND = Previous FINAL UPPERBAND) and (Current Close <= Current FINAL UPPERBAND)) THEN Current FINAL UPPERBAND ELSE IF((Previous SUPERTREND = Previous FINAL UPPERBAND) and (Current Close > Current FINAL UPPERBAND)) THEN Current FINAL LOWERBAND ELSE IF((Previous SUPERTREND = Previous FINAL LOWERBAND) and (Current Close >= Current FINAL LOWERBAND)) THEN Current FINAL LOWERBAND ELSE IF((Previous SUPERTREND = Previous FINAL LOWERBAND) and (Current Close < Current FINAL LOWERBAND)) THEN Current FINAL UPPERBAND
def tr(self, array: bool = False) ‑> Union[float, numpy.ndarray]
-
True Range (TR).
def trange(self, array: bool = False) ‑> Union[float, numpy.ndarray]
-
TRANGE.
def trendlines_with_breaks(self, length: int = 14, slope: float = 1, method: str = 'atr', array: bool = False, input: numpy.ndarray = None)
-
indicator("Trendlines with Breaks [LuxAlgo]",overlay=true) length = input.int(14) k = input.float(1.,'Slope',minval=0,step=.1) method = input.string('Atr','Slope Calculation Method', options=['Atr','Stdev','Linreg']) show = input(false,'Show Only Confirmed Breakouts') //---- upper = 0.,lower = 0. slope_ph = 0.,slope_pl = 0. src = close n = bar_index //---- ph = ta.pivothigh(length,length) pl = ta.pivotlow(length,length) plot(ph,'pivothigh',color =color.white) plot(pl,'pivotlow',color =color.white)
slope = switch method 'Atr' => ta.atr(length)/lengthk 'Stdev' => ta.stdev(src,length)/lengthk 'Linreg' => math.abs(ta.sma(srcbar_index,length)-ta.sma(src,length)ta.sma(bar_index,length))/ta.variance(n,length)/2*k
slope_ph := ph ? slope : slope_ph[1] slope_pl := pl ? slope : slope_pl[1]
upper := ph ? ph : upper[1] - slope_ph lower := pl ? pl : lower[1] + slope_pl //---- single_upper = 0 single_lower = 0 single_upper := src[length] > upper ? 0 : ph ? 1 : single_upper[1] single_lower := src[length] < lower ? 0 : pl ? 1 : single_lower[1] upper_breakout = single_upper[1] and src[length] > upper and (show ? src > src[length] : 1) lower_breakout = single_lower[1] and src[length] < lower and (show ? src < src[length] : 1) plotshape(upper_breakout ? low[length] : na,"Upper Break",shape.labelup,location.absolute,#26a69a,-length,text="B",textcolor=color.white,size=size.tiny) plotshape(lower_breakout ? high[length] : na,"Lower Break",shape.labeldown,location.absolute,#ef5350,-length,text="B",textcolor=color.white,size=size.tiny) //---- var line up_l = na var line dn_l = na var label recent_up_break = na var label recent_dn_break = na
if ph[1] line.delete(up_l[1]) label.delete(recent_up_break[1])
up_l := line.new(n-length-1,ph[1],n-length,upper,color=#26a69a, extend=extend.right,style=line.style_dashed)
if pl[1] line.delete(dn_l[1]) label.delete(recent_dn_break[1])
dn_l := line.new(n-length-1,pl[1],n-length,lower,color=#ef5350, extend=extend.right,style=line.style_dashed)
if ta.crossover(src,upper-slope_ph*length) label.delete(recent_up_break[1]) recent_up_break := label.new(n,low,'B',color=#26a69a, textcolor=color.white,style=label.style_label_up,size=size.small)
if ta.crossunder(src,lower+slope_pl*length) label.delete(recent_dn_break[1]) recent_dn_break := label.new(n,high,'B',color=#ef5350, textcolor=color.white,style=label.style_label_down,size=size.small)
//---- plot(upper,'Upper',color = ph ? na : #26a69a,offset=-length) plot(lower,'Lower',color = pl ? na : #ef5350,offset=-length)
alertcondition(ta.crossover(src,upper-slope_phlength),'Upper Breakout','Price broke upper trendline') alertcondition(ta.crossunder(src,lower+slope_pllength),'Lower Breakout','Price broke lower trendline')
def trix(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
TRIX.
def ultosc(self, time_period1: int = 7, time_period2: int = 14, time_period3: int = 28, array: bool = False) ‑> Union[float, numpy.ndarray]
-
Ultimate Oscillator.
def update_bar(self, bar) ‑> None
-
Update new bar data into array manager.
def ut_bot(self, dev: int, atr_period: int = 0, array: bool = False, input: numpy.ndarray = None)
def ut_bot_chatgpt(self, dev: int = 1, atr_period: int = 10, array: bool = False, input: numpy.ndarray = None)
def volume_osc(self, n_short: int = 1, n_long: int = 14, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
Volume Oscillator
def where(self, condition, true_result: Union[float, numpy.ndarray], false_result: Union[float, numpy.ndarray], array: bool = False) ‑> Union[float, numpy.ndarray]
-
like np.where
def willr(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]
-
WILLR.
def wma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]
-
WMA.
def wr(self, period: int = 14, array: bool = False, input: numpy.ndarray = None)
-
0-20: overbought -> can short 80-100: oversold -> can long
Williams Percent Range
//@version=5 indicator("Williams Percent Range", shorttitle="Williams %R", format=format.price, precision=2, timeframe="", timeframe_gaps=true) length = input(title="Length", defval=14) src = input(close, "Source") _pr(length) => max = ta.highest(length) min = ta.lowest(length) 100 * (src - max) / (max - min) percentR = _pr(length) obPlot = hline(-20, title="Upper Band", color=#787B86) hline(-50, title="Middle Level", linestyle=hline.style_dotted, color=#787B86) osPlot = hline(-80, title="Lower Band", color=#787B86) fill(obPlot, osPlot, title="Background", color=color.rgb(126, 87, 194, 90)) plot(percentR, title="%R", color=#7E57C2)
def zlsma(self, n: int, offset: int = 0, array: bool = False, input: numpy.ndarray = None)
-
Zero Lag Least Squared Moving Average