def ts_sum(self,x, window=10):
"""
Wrapper function to estimate rolling sum using TA-Lib.
"""
return talib.SUM(x, timeperiod=window)
def stddev(self,x, window=10):
"""
Wrapper function to estimate rolling standard deviation using TA-Lib.
"""
return talib.STDDEV(x, timeperiod=window, nbdev=1)
def covariance(self,x, y, window=10):
"""
Wrapper function to estimate rolling covariance using TA-Lib.
"""
return talib.CORREL(x, y, timeperiod=window) * talib.STDDEV(x, window) * talib.STDDEV(y, window)
def ts_min(self,x, window=10):
"""
Wrapper function to estimate rolling min using TA-Lib.
"""
return talib.MIN(x, timeperiod=window)
def ts_max(self,x, window=10):
"""
Wrapper function to estimate rolling max using TA-Lib.
"""
return talib.MAX(x, timeperiod=window)
def delta(self,x, period=1):
"""
Wrapper function to estimate difference using TA-Lib.
"""
return talib.MOM(x, timeperiod=period)
def ts_argmin(self,x, window=10):
"""
更高效的 NumPy 实现版本
"""
if len(x) < window:
return np.full_like(x, np.nan)
# 创建滑动窗口视图 (避免复制数据)
shape = x.shape[:-1] + (x.shape[-1] - window + 1, window)
strides = x.strides + (x.strides[-1],)
windows = np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides)
# 计算每个窗口的argmin
min_indices = np.argmin(windows, axis=-1)
# 构建结果数组
result = np.empty_like(x, dtype=float)
result[:window - 1] = np.nan
result[window - 1:] = min_indices + 1 # 转换为1-based
return result
def ts_argmax(self,x, window=10):
"""
更高效的 NumPy 实现版本(使用 stride_tricks 创建滑动窗口视图)
"""
if len(x) < window:
return np.full_like(x, np.nan)
# 创建滑动窗口视图 (避免复制数据)
shape = x.shape[:-1] + (x.shape[-1] - window + 1, window)
strides = x.strides + (x.strides[-1],)
windows = np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides)
# 计算每个窗口的argmax
max_indices = np.argmax(windows, axis=-1)
# 构建结果数组
result = np.empty_like(x, dtype=float)
result[:window - 1] = np.nan
result[window - 1:] = max_indices + 1 # 转换为1-based
return result
def delay(self,x, window=1):
"""
"""
# 使用 np.roll 进行数据平移,axis=0 按行平移
result = np.roll(x, shift=window, axis=0)
return result
def decay_linear(self,x, window=10):
"""
Linear weighted moving average implementation using TA-Lib.
"""
return talib.WMA(x, timeperiod=window)
def rolling_prod(self,x):
"""
Auxiliary function to be used in pd.rolling_apply
:param na: numpy array.
:return: The product of the values in the array.
"""
return np.prod(x)
def ts_rank(self,x, window=10):
return self.rank(x[-window:],pct=True)
def rank(self,arr: np.ndarray, pct: bool = False, method: str = "average") -> np.ndarray:
"""
Rank a 1D numpy array with handling of ties.
:param arr: A numpy ndarray (1D array).
:param pct: If True, return ranks as percentage (0 to 1).
:param method: How to handle ties. Options are: "average", "min", "max", "first".
:return: A numpy ndarray with ranks.
"""
# 获取数组的长度
n = arr.shape[0]
# 使用argsort来获取排序后的索引
sorted_indices = np.argsort(arr)
sorted_arr = arr[sorted_indices]
# 获取排名:原始位置上的排名
ranks = np.zeros_like(arr, dtype=float)
if method == "average":
# 对相同数值给出平均排名
# 计算排序后每个值的排名
unique_vals, unique_idx = np.unique(sorted_arr, return_index=True)
rank_values = np.argsort(sorted_arr) + 1 # 计算排序后的排名
# 对于每个重复的元素,计算其平均排名
for i in range(len(unique_vals)):
same_val_indices = np.where(sorted_arr == unique_vals[i])[0]
avg_rank = np.mean(rank_values[same_val_indices])
ranks[sorted_indices[same_val_indices]] = avg_rank
elif method == "min":
# 对相同数值给出最小排名
rank_values = np.argsort(np.argsort(sorted_arr)) + 1
for i in range(n):
ranks[sorted_indices[i]] = rank_values[i]
# 修复:使用最小排名
for i in range(n):
ranks[sorted_indices[i]] = min(rank_values[i], ranks[sorted_indices[i]])
elif method == "max":
# 对相同数值给出最大排名
rank_values = np.argsort(np.argsort(sorted_arr)) + 1
for i in range(n):
ranks[sorted_indices[i]] = rank_values[i]
# 修复:使用最大排名
for i in range(n):
ranks[sorted_indices[i]] = max(rank_values[i], ranks[sorted_indices[i]])
elif method == "first":
# 对相同数值按其在数组中的出现顺序排序
rank_values = np.argsort(np.argsort(sorted_arr)) + 1
for i in range(n):
ranks[sorted_indices[i]] = rank_values[i]
# 如果pct为True,将排名转化为百分比
ranks = (ranks - 1) / (n - 1) if pct else ranks
return ranks
def scale(self,x, k=1):
"""
使用 NumPy 实现的 scale 函数
返回:缩放后的数组
"""
abs_sum = np.sum(np.abs(x))
# 避免除以零
if abs_sum == 0:
return np.zeros_like(x)
return x * (k / abs_sum)