Module areixio.utils.indicator

Classes

class BarGenerator (on_bar: Callable = None, window: int = 0, on_window_bar: Callable = None, hour_window: bool = False, strict_agg: bool = False)
  1. generating 1 minute bar data from tick data
  2. generating x minute bar/x hour bar data from 1 minute data

Args

on_bar : Callable, optional
callback function when new bar generated. Defaults to None.
window : int, optional
window. Defaults to 0. if hour_window is False, this is for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30 if hour_window is True, this is for x hour bar, x can be any number
on_window_bar : Callable, optional
callback function when new window bar generated. Defaults to None.
hour_window : bool, optional
if is generator is for genrating hour window abr. Defaults to False.

Class variables

var hour_window_map

Methods

def update_bar(self, bar: Union[dict, pandas.core.series.Series]) ‑> None

Update new bar data into array manager.

def update_bar_hour_window(self, bar) ‑> None
  • 1 minute bar to 1 hour bar
  • 1 minute bar to 4 hour bar
  • 15 minute bar to 1 hour bar
  • 15 minute bar to 4 hour bar
  • 1 hour bar to 1 hour bar
  • 1 hour bar to 4 hour bar
  • 2 hour bar to 4 hour bar

1: 0, 1, 2, 3, 4, 5, 6, 7, .... 22, 23 2: 0, 2, 4, 6, 8, .... 20, 22 4: 0, 4, 8, 12, 16, 20 6: 2, 8, 14, 20 8: 0, 8, 16 12: 0, 12

def update_bar_minute_window(self, bar) ‑> None
def update_tick(self, tick: Union[dict, pandas.core.series.Series]) ‑> None

Update new tick data into generator.

class Indicator (size: int = 100, window: int = 1, hour_window: bool = False, strict_agg: bool = False, datafeed=None)

Args

size : int, optional
store size. Defaults to 100.
window : int, optional
time window. Defaults to 1.
hour_window : bool, optional
if True, bar will be hour bar, otherwise, it is minute bar. Defaults to False.
strict_agg : bool, optional
mainly for 4h & 6h bar data, if true, the aggreagted datetime will be same as exchange. Defaults to False.
datafeed : _type_, optional
if specified, will automatically download the size data so that the indicator will be successfully initialized. Defaults to None.

Raises

ImportError
if ta-lib is not installed

Instance variables

var close : numpy.ndarray

Get close price time series.

var high : numpy.ndarray

Get high price time series.

var low : numpy.ndarray

Get low price time series.

var open : numpy.ndarray

Get open price time series.

var ticks : numpy.ndarray

Get datetime time series.

var turnover : numpy.ndarray

Get trading turnover time series.

var volume : numpy.ndarray

Get trading volume time series.

var zeros : numpy.ndarray

Methods

def ad(self, array: bool = False) ‑> Union[float, numpy.ndarray]

Chaikin A/D Line

def adosc(self, fast_period: int, slow_period: int, array: bool = False) ‑> Union[float, numpy.ndarray]

Chaikin A/D Oscillator AKA Chaikin Money Flow

def adx(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

ADX.

def adx1(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

ADX.

TrueRange = max(max(high-low, abs(high-nz(close[1]))), abs(low-nz(close[1])))
DirectionalMovementPlus = high-nz(high[1]) > nz(low[1])-low ? max(high-nz(high[1]), 0): 0
DirectionalMovementMinus = nz(low[1])-low > high-nz(high[1]) ? max(nz(low[1])-low, 0): 0

SmoothedTrueRange = 0.0
SmoothedTrueRange[i] = nz(SmoothedTrueRange[1]) - (nz(SmoothedTrueRange[1])/len) + TrueRange

SmoothedDirectionalMovementPlus = 0.0
SmoothedDirectionalMovementPlus[i] = nz(SmoothedDirectionalMovementPlus[1]) - (nz(SmoothedDirectionalMovementPlus[1])/len) + DirectionalMovementPlus

SmoothedDirectionalMovementMinus = 0.0
SmoothedDirectionalMovementMinus[i] = nz(SmoothedDirectionalMovementMinus[1]) - (nz(SmoothedDirectionalMovementMinus[1])/len) + DirectionalMovementMinus

DIPlus = SmoothedDirectionalMovementPlus / SmoothedTrueRange * 100
DIMinus = SmoothedDirectionalMovementMinus / SmoothedTrueRange * 100
DX = abs(DIPlus-DIMinus) / (DIPlus+DIMinus)*100
ADX = sma(DX, len)
def adxr(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

ADXR.

def ahr999(self) ‑> Union[float, numpy.ndarray]

ahr999 指数 = (比特币价格 /200 日定投成本) * (比特币价格 / 指数增长估值) - 200日定投成本 = 过去200日的比特币价格的均值 - 指数增长估值 = 10^[5.84 * log(币龄) - 17.01] - 币龄 = 当前距离2009年1月3日(该日期被认为是btc诞生日)的天数 (timestamp: 1230940800)

当ahr999<0.45时适合抄底, 0.45<ahr999<1.2时适合定投BTC, ahr999>1.2时不是良好的投资btc的时机

def alligator(self, jaw_length: int = 13, teeth_length: int = 8, lips_length: int = 5, jaw_offset: int = 8, teeth_offset: int = 5, lips_offset: int = 3, array: bool = False, input: numpy.ndarray = None)
def apo(self, fast_period: int, slow_period: int, matype: int = 0, array: bool = False) ‑> Union[float, numpy.ndarray]

APO.

def aroon(self, n: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]

Aroon indicator.

def aroonosc(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

Aroon Oscillator.

def atr(self, n: int, multiplier: float = 1, array: bool = False, matype: int = 0) ‑> Union[float, numpy.ndarray]

Average True Range (ATR).

def av2(self, ma_period: int, ma_period_smoothing: float, ma_type: int = 0, array: bool = False, input: numpy.ndarray = None)

Smoothed Heikin Ashi Cloud A-V2: https://www.tradingview.com/script/DTDQ3y76/ B-V2: https://www.tradingview.com/script/acmdQs2A/

an overlay showing “Smoothed Heikin Ashi” and measure the momentum of the trend and identify potential trend rejections.

def avg(self, *args) ‑> float

avg

def bband(self, n: int, dev: float, matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]

Bollinger Channel.

def boll(self, n: int, dev: float, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]

Bollinger Channel.

def bop(self, array: bool = False) ‑> Union[float, numpy.ndarray]

BOP.

def cci(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

Commodity Channel Index (CCI).

def chandelier_exit(self, atr_length: int, atr_multiplier: float, array: bool = False, input: numpy.ndarray = None)

Chandelier Exit

longStop = (useClose ? highest(close, length) : highest(length)) - atr longStopPrev = nz(longStop[1], longStop) longStop[i] = close[1] > longStopPrev ? max(longStop, longStopPrev) : longStop

shortStop = (useClose ? lowest(close, length) : lowest(length)) + atr shortStopPrev = nz(shortStop[1], shortStop) shortStop[i] = close[1] < shortStopPrev ? min(shortStop, shortStopPrev) : shortStop

var int dir = 1 dir[i] = close > shortStopPrev ? 1 : close < longStopPrev ? -1 : dir

buySignal = dir == 1 and dir[1] == -1 sellSignal = dir == -1 and dir[1] == 1

buypx = buySignal ? longStop : na sellpx = sellSignal ? shortStop : na

def change(self, source: numpy.ndarray) ‑> float
def cmo(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

CMO.

def crossover(self, source1: numpy.ndarray, source2: numpy.ndarray) ‑> bool
def crossunder(self, source1: numpy.ndarray, source2: numpy.ndarray) ‑> bool
def custom(self, func: Callable, array: bool = False, *args, **kwargs) ‑> Union[float, numpy.ndarray]

Custom func

Note

all the inputs need to be the same shape the sequence of inputs need to follow the func params

def dema(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

Double EMA

def donchian(self, n: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]

Donchian Channel.

def dx(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

DX.

def ema(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

Exponential moving average.

def ha(self, is_simple: bool = False, array: bool = False, input_open: numpy.ndarray = None, input_high: numpy.ndarray = None, input_low: numpy.ndarray = None, input_close: numpy.ndarray = None) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float, float]]

Heiken Ashi Candles (HA)

def halftrend(self, amplitude: int = 4, deviation: int = 2, array: bool = False, input: numpy.ndarray = None)
def hama(self, ma_open_period: int = 25, ma_high_period: int = 20, ma_low_period: int = 20, ma_close_period: int = 20, ma_type: int = 0, array: bool = False, input: numpy.ndarray = None)
def heatmap_volume(self, ma_length: int = 610, std_length: int = 610, extra_high_threshold: float = 4, high_threshold: float = 2.5, medium_threshold: float = 1, normal_threshold: float = -0.5, array: bool = False, input: numpy.ndarray = None)

extra_high high medium normal low

length := length > bar_index + 1 ? bar_index + 1 : length slength := slength > bar_index + 1 ? bar_index + 1 : slength

pstdev(Series, Period) => mean = sum(Series, Period) / Period summation = 0.0 for i=0 to Period-1 sampleMinusMean = nz(Series[i]) - mean summation := summation + sampleMinusMean * sampleMinusMean return = sqrt(summation / Period)

mean = sma(volume, length) std = pstdev(volume, slength) stdbar = (volume - mean) / std dir = close > open v = osc ? volume - mean : volume mosc = osc ? 0 : mean

bcolor = stdbar > thresholdExtraHigh ? dir ? cthresholdExtraHighUp : cthresholdExtraHighDn : stdbar > thresholdHigh ? dir ? cthresholdHighUp : cthresholdHighDn : stdbar > thresholdMedium ? dir ? cthresholdMediumUp : cthresholdMediumDn : stdbar > thresholdNormal ? dir ? cthresholdNormalUp : cthresholdNormalDn : dir ? cthresholdLowUp : cthresholdLowDn

def highest(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

highest.

def hl2(self, array: bool = False) ‑> Union[float, numpy.ndarray]

(high + low ) / 2

def hlc3(self, array: bool = False) ‑> Union[float, numpy.ndarray]

(high + low + close) / 3

def hma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

HMA - Hull ma. HMA= WMA(2*WMA(n/2) - WMA(n)),sqrt(n))

def ichimoku(self, tenkan_sen_period: int = 9, kijun_sen_period: int = 26, senkou_span_period: int = 52, chikou_span_period: int = 26, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float, float, float]]

Ichimoku Cloud (Ichimoku)

def ichimoku1(self, tenkan_sen_period: int = 9, kijun_sen_period: int = 26, senkou_span_period: int = 52, chikou_span_period: int = 26, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float, float, float]]

Ichimoku Cloud (Ichimoku)

def impulse_macd(self, ma_period: int = 34, signal_period: int = 9, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]

Impulse Macd

def kama(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

KAMA.

def kdj(self, n: int, smooth_k: int, smooth_d: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]

Stochastic Relative Strenght Index

def kdj0(self, n: int, smooth_k: int, smooth_d: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]

Stochastic Relative Strenght Index

def kdj1(self, n: int, fastk_period: int, fastd_period: int, fastd_matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]

KDJ

def keltner(self, n: int, dev: float, atr_length: int = None, matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]

Keltner Channel.

def lowest(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

lowest.

def macd(self, fast_period: int = 13, slow_period: int = 34, signal_period: int = 9, matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]

MACD. 0-SMA; 1-EMA

def macd_divergence(self, fast_period: int = 13, slow_period: int = 34, signal_period: int = 9, matype: int = 0, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray], Tuple[float, float, float]]
def max(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

max.

def maximum(self, input1: numpy.ndarray, input2: numpy.ndarray, array: bool = False) ‑> Union[float, numpy.ndarray]

maximum.

def mfi(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

Money Flow Index.

def min(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

min.

def minimum(self, input1: numpy.ndarray, input2: numpy.ndarray, array: bool = False) ‑> Union[float, numpy.ndarray]

minimum.

def minus_di(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

MINUS_DI.

def minus_dm(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

MINUS_DM.

def mom(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

MOM.

def na(self, source: float) ‑> bool
def natr(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

NATR.

def nz(self, source1: float, source2: float) ‑> float
def obv(self, array: bool = False) ‑> Union[float, numpy.ndarray]

On Balance Volume

def pine_sar(self, start: float = 0.02, inc: float = 0.02, maximum: float = 0.2)
def pivothigh(self, left, right=0)

https://blog.csdn.net/sumubaiblog/article/details/122095719 通道裡面的最低點跟最高點 在一段震盪行情的高點與低點, RSI 背離

def pivotlow(self, left, right=0)
def plus_di(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

PLUS_DI.

def plus_dm(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

PLUS_DM.

def ppo(self, fast_period: int, slow_period: int, matype: int = 0, array: bool = False) ‑> Union[float, numpy.ndarray]

PPO.

def qqe(self, rsi_period: int, rsi_smoothing: int, qqe_factor: int, array: bool = False, input: numpy.ndarray = None)

Quantitative Qualitative Estimation (QQE)

The Quantitative Qualitative Estimation (QQE) is similar to SuperTrend but uses a Smoothed RSI with an upper and lower bands. The band width is a combination of a one period True Range of the Smoothed RSI which is double smoothed using Wilder's smoothing length (2 * rsiLength - 1) and multiplied by the default factor of 4.236. A Long trend is determined when the Smoothed RSI crosses the previous upperband and a Short trend when the Smoothed RSI crosses the previous lowerband.

Based on QQE.mq5 by EarnForex Copyright © 2010, based on version by Tim Hyder (2008), based on version by Roman Ignatov (2006)

Sources

https://www.tradingview.com/script/IYfA9R2k-QQE-MT4/ https://www.tradingpedia.com/forex-trading-indicators/quantitative-qualitative-estimation https://www.prorealcode.com/prorealtime-indicators/qqe-quantitative-qualitative-estimation/

Calculation

Default Inputs: length=14, smooth=5, factor=4.236, mamode="ema", drift=1

Args

close : pd.Series
Series of 'close's
length : int
RSI period. Default: 14
smooth : int
RSI smoothing period. Default: 5
factor : float
QQE Factor. Default: 4.236
mamode : str
See help(ta.ma). Default: 'sma'
drift : int
The difference period. Default: 1
offset : int
How many periods to offset the result. Default: 0

Kwargs

fillna (value, optional): pd.DataFrame.fillna(value) fill_method (value, optional): Type of fill method

Returns

pd.DataFrame
QQE, RSI_MA (basis), QQEl (long), and QQEs (short) columns.

qqe_df = cal_qqe(pd.Series(indicator.close), length=self.rsi_length, smooth=self.rsi_smoothing, factor=self.fast_qqe_factor, ).fillna(0.0)

qqe_df = qqe_df if qqe_df is None else qqe_df.fillna(0.0)

print(qqe_df,'????')

qqe_df = cal_qqe(hist.close, )

qqe_long = qqe_df[f"QQEl_{self.rsi_length}{self.rsi_smoothing}"].iloc[-1] qqe_long = qqe_long - 50 if qqe_long else qqe_long qqe_short = qqe_df[f"QQEs_{self.rsi_length}{self.rsi_smoothing}"].iloc[-1] qqe_short = qqe_short - 50 if qqe_short else qqe_short

def rh_inventory_retracement_bar(self, z: int = 45, array: bool = False, input: numpy.ndarray = None)
def rh_overlay_set(self, array: bool = False, input: numpy.ndarray = None)

https://cn.tradingview.com/script/al4ABhwU-Rob-Hoffman-Overlay-Set/

plot(a, title = "Fast Speed Line", linewidth = 2, color = #0000FF) plot(b, title = "Slow Speed Line", linewidth = 2, color = fuchsia) plot(c, title = "Fast Primary Trend Line", linewidth = 3, color = #00FF00) plot(d, title = "Slow Primary Trend Line", linewidth = 3, color = #000000) plot(e, title = "Trend Line - 1", linewidth = 3, color = #0000FF, style = circles) plot(f, title = "Trend Line - 2", linewidth = 3, color = #20B2AA) plot(g, title = "Trend Line - 3", linewidth = 3, color = #FF4500) plot(h, title = "Trend Line - 4", linewidth = 3, color = fuchsia)

plot(k, title = "No Trend Zone - Midline", linewidth = 2, color = #3CB371) plot(ku, title = "No Trend Zone - Upperline", linewidth = 2, color = #3CB371) plot(kl, title = "No Trend Zone - Lowerline", linewidth = 2, color = #3CB371)

def rma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

wildeR's Moving Average (RMA)

def roc(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

ROC.

def rocp(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

ROCP.

def rocr(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

ROCR.

def rocr_100(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

ROCR100.

def rsi(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

Relative Strenght Index (RSI).

def sar(self, acceleration: int = 0, maximum: int = 0, array: bool = False) ‑> Union[float, numpy.ndarray]

SAR.

start: 0.02 increament: 0.02 maximum: 0.2

def sarext(self, startvalue: int = 0, offsetonreverse: int = 0, accelerationinitlong: int = 0, accelerationlong: int = 0, accelerationmaxlong: int = 0, accelerationinitshort: int = 0, accelerationshort: int = 0, accelerationmaxshort: int = 0, array: bool = False) ‑> Union[float, numpy.ndarray]

SAR Extended.

start: 0.02 increament: 0.02 maximum: 0.2

def shift(self, n: int, input: numpy.ndarray, fill_value=nan)

shift data

def sma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

Simple moving average.

def smma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

Smoothed moving average.

def ssl_channel(self, ma_high_period: int, ma_low_period: int, array: bool = False, input: numpy.ndarray = None)

SSL Channel

def stc(self, period: int = 12, fast_period: int = 26, slow_period: int = 50, multiplier: float = 0.5, array: bool = False, input: numpy.ndarray = None)

https://cn.tradingview.com/script/WhRRThMI-STC-Indicator-A-Better-MACD-SHK/

AAA = input(0.5) var CCCCC = 0.0 var DDD = 0.0 var DDDDDD = 0.0 var mAAAAA = 0.0

fastMA = ta.ema(close, BBBB) slowMA = ta.ema(close, BBBBB) BBBBBB = fastMA - slowMA

CCC = ta.lowest(BBBBBB, EEEEEE) CCCC = ta.highest(BBBBBB, EEEEEE) - CCC CCCCC := CCCC > 0 ? (BBBBBB - CCC) / CCCC * 100 : nz(CCCCC[1]) DDD := na(DDD[1]) ? CCCCC : DDD[1] + AAA * (CCCCC - DDD[1]) DDDD = ta.lowest(DDD, EEEEEE) DDDDD = ta.highest(DDD, EEEEEE) - DDDD DDDDDD := DDDDD > 0 ? (DDD - DDDD) / DDDDD * 100 : nz(DDDDDD[1]) mAAAAA := na(mAAAAA[1]) ? DDDDDD : mAAAAA[1] + AAA * (DDDDDD - mAAAAA[1])

mColor = mAAAAA > mAAAAA[1] ? color.new(color.green, 20) : color.new(color.red, 20)

def stc_chatgpt(self, period: int = 12, fast_period: int = 26, slow_period: int = 50, multiplier: float = 0.5, array: bool = False, input: numpy.ndarray = None)
def std(self, n: int, nbdev: int = 1, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

Standard deviation.

def stoch(self, fastk_period: int, slowk_period: int, slowk_matype: int, slowd_period: int, slowd_matype: int, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, float]]

Stochastic Indicator

def sum(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

Summation.

def supertrend(self, period, multiplier: int = 1, array: bool = False) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[float, str]]

SuperTrend

SuperTrend Algorithm :

BASIC UPPERBAND = (HIGH + LOW) / 2 + Multiplier * ATR
BASIC LOWERBAND = (HIGH + LOW) / 2 - Multiplier * ATR

FINAL UPPERBAND = IF( (Current BASICUPPERBAND < Previous FINAL UPPERBAND) or (Previous Close > Previous FINAL UPPERBAND))
                    THEN (Current BASIC UPPERBAND) ELSE Previous FINALUPPERBAND)
FINAL LOWERBAND = IF( (Current BASIC LOWERBAND > Previous FINAL LOWERBAND) or (Previous Close < Previous FINAL LOWERBAND))
                    THEN (Current BASIC LOWERBAND) ELSE Previous FINAL LOWERBAND)

SUPERTREND = IF((Previous SUPERTREND = Previous FINAL UPPERBAND) and (Current Close <= Current FINAL UPPERBAND)) THEN
                Current FINAL UPPERBAND
            ELSE
                IF((Previous SUPERTREND = Previous FINAL UPPERBAND) and (Current Close > Current FINAL UPPERBAND)) THEN
                    Current FINAL LOWERBAND
                ELSE
                    IF((Previous SUPERTREND = Previous FINAL LOWERBAND) and (Current Close >= Current FINAL LOWERBAND)) THEN
                        Current FINAL LOWERBAND
                    ELSE
                        IF((Previous SUPERTREND = Previous FINAL LOWERBAND) and (Current Close < Current FINAL LOWERBAND)) THEN
                            Current FINAL UPPERBAND
def tr(self, array: bool = False) ‑> Union[float, numpy.ndarray]

True Range (TR).

def trange(self, array: bool = False) ‑> Union[float, numpy.ndarray]

TRANGE.

def trendlines_with_breaks(self, length: int = 14, slope: float = 1, method: str = 'atr', array: bool = False, input: numpy.ndarray = None)

indicator("Trendlines with Breaks [LuxAlgo]",overlay=true) length = input.int(14) k = input.float(1.,'Slope',minval=0,step=.1) method = input.string('Atr','Slope Calculation Method', options=['Atr','Stdev','Linreg']) show = input(false,'Show Only Confirmed Breakouts') //---- upper = 0.,lower = 0. slope_ph = 0.,slope_pl = 0. src = close n = bar_index //---- ph = ta.pivothigh(length,length) pl = ta.pivotlow(length,length) plot(ph,'pivothigh',color =color.white) plot(pl,'pivotlow',color =color.white)

slope = switch method 'Atr' => ta.atr(length)/lengthk 'Stdev' => ta.stdev(src,length)/lengthk 'Linreg' => math.abs(ta.sma(srcbar_index,length)-ta.sma(src,length)ta.sma(bar_index,length))/ta.variance(n,length)/2*k

slope_ph := ph ? slope : slope_ph[1] slope_pl := pl ? slope : slope_pl[1]

upper := ph ? ph : upper[1] - slope_ph lower := pl ? pl : lower[1] + slope_pl //---- single_upper = 0 single_lower = 0 single_upper := src[length] > upper ? 0 : ph ? 1 : single_upper[1] single_lower := src[length] < lower ? 0 : pl ? 1 : single_lower[1] upper_breakout = single_upper[1] and src[length] > upper and (show ? src > src[length] : 1) lower_breakout = single_lower[1] and src[length] < lower and (show ? src < src[length] : 1) plotshape(upper_breakout ? low[length] : na,"Upper Break",shape.labelup,location.absolute,#26a69a,-length,text="B",textcolor=color.white,size=size.tiny) plotshape(lower_breakout ? high[length] : na,"Lower Break",shape.labeldown,location.absolute,#ef5350,-length,text="B",textcolor=color.white,size=size.tiny) //---- var line up_l = na var line dn_l = na var label recent_up_break = na var label recent_dn_break = na

if ph[1] line.delete(up_l[1]) label.delete(recent_up_break[1])

up_l := line.new(n-length-1,ph[1],n-length,upper,color=#26a69a,
extend=extend.right,style=line.style_dashed)

if pl[1] line.delete(dn_l[1]) label.delete(recent_dn_break[1])

dn_l := line.new(n-length-1,pl[1],n-length,lower,color=#ef5350,
extend=extend.right,style=line.style_dashed)

if ta.crossover(src,upper-slope_ph*length) label.delete(recent_up_break[1]) recent_up_break := label.new(n,low,'B',color=#26a69a, textcolor=color.white,style=label.style_label_up,size=size.small)

if ta.crossunder(src,lower+slope_pl*length) label.delete(recent_dn_break[1]) recent_dn_break := label.new(n,high,'B',color=#ef5350, textcolor=color.white,style=label.style_label_down,size=size.small)

//---- plot(upper,'Upper',color = ph ? na : #26a69a,offset=-length) plot(lower,'Lower',color = pl ? na : #ef5350,offset=-length)

alertcondition(ta.crossover(src,upper-slope_phlength),'Upper Breakout','Price broke upper trendline') alertcondition(ta.crossunder(src,lower+slope_pllength),'Lower Breakout','Price broke lower trendline')

def trix(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

TRIX.

def ultosc(self, time_period1: int = 7, time_period2: int = 14, time_period3: int = 28, array: bool = False) ‑> Union[float, numpy.ndarray]

Ultimate Oscillator.

def update_bar(self, bar) ‑> None

Update new bar data into array manager.

def ut_bot(self, dev: int, atr_period: int = 0, array: bool = False, input: numpy.ndarray = None)
def ut_bot_chatgpt(self, dev: int = 1, atr_period: int = 10, array: bool = False, input: numpy.ndarray = None)
def volume_osc(self, n_short: int = 1, n_long: int = 14, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

Volume Oscillator

def where(self, condition, true_result: Union[float, numpy.ndarray], false_result: Union[float, numpy.ndarray], array: bool = False) ‑> Union[float, numpy.ndarray]

like np.where

def willr(self, n: int, array: bool = False) ‑> Union[float, numpy.ndarray]

WILLR.

def wma(self, n: int, array: bool = False, input: numpy.ndarray = None) ‑> Union[float, numpy.ndarray]

WMA.

def wr(self, period: int = 14, array: bool = False, input: numpy.ndarray = None)

0-20: overbought -> can short 80-100: oversold -> can long

Williams Percent Range

//@version=5 indicator("Williams Percent Range", shorttitle="Williams %R", format=format.price, precision=2, timeframe="", timeframe_gaps=true) length = input(title="Length", defval=14) src = input(close, "Source") _pr(length) => max = ta.highest(length) min = ta.lowest(length) 100 * (src - max) / (max - min) percentR = _pr(length) obPlot = hline(-20, title="Upper Band", color=#787B86) hline(-50, title="Middle Level", linestyle=hline.style_dotted, color=#787B86) osPlot = hline(-80, title="Lower Band", color=#787B86) fill(obPlot, osPlot, title="Background", color=color.rgb(126, 87, 194, 90)) plot(percentR, title="%R", color=#7E57C2)

def zlsma(self, n: int, offset: int = 0, array: bool = False, input: numpy.ndarray = None)

Zero Lag Least Squared Moving Average