我们结合使用Apache Spark和Sparkube来动态创建在线分析环境。数据在Spark中准备,并显示为带有Sparkube的多维立方体。Sparkube自动发布简单的聚合(SUM,MIN,MAX,AVG,STD...)但是我们如何支持"Last Non empty“这种聚合呢?
以此数据集为例,定期记录各种产品的库存数量。2018年的库存不应该是当年库存记录的总和,而应该是该年内的最新库存记录。
Time,Product,Stock
2017-11-01, Oranges, 40000
2017-11-01, Apples, 120000
2017-12-01, Oranges, 42000
2017-12-01, Apples, 110000
2018-01-01, Oranges, 50000
2018-01-01, Apples, 100000
2018-02-01, Oranges, 48000
2018-02-01, Apples, 130000
2018-03-01, Oranges, 46000
2018-03-01, Apples, 120000发布于 2018-05-11 23:54:15
问题是,库存数量记录(或一般情况下的位置)不会随着时间的推移而汇总。但是数量上的变化是有影响的。因此,您可以使用Spark来计算库存数量变化,这将在一个立方体中一致地求和,然后从您的OLAP前端创建一个计算的度量,该度量将(重新)计算立方体中任何位置的实际库存。
在Spark中加载数据集并计算每个产品的库存变化
import org.apache.spark.sql.expressions.Window
var ds = spark.read
.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("/path/to/stock.csv")
val ws = Window.partitionBy("Product").orderBy("Time")
val ds2 = ds
.withColumn("PreviousStockTmp", lag(col("Stock"), 1).over(ws))
.withColumn("PreviousStock", when($"PreviousStockTmp".isNull, 0).otherwise($"PreviousStockTmp"))
.drop("PreviousStockTmp")
.withColumn("StockVariation", col("Stock").minus(col("PreviousStock"))).orderBy("Time")添加通常的‘年’,‘月’,‘日’等。对联机分析有用的列
val ds3 = ds2
.withColumn("Year", year(col("Time")))
.withColumn("Month", month(col("Time")))
.withColumn("Day", dayofmonth(col("Time")))最后,使用Sparkube将数据集发布为立方体
import com.activeviam.sparkube._
new Sparkube().fromDataset(ds3).expose()现在您可以在Excel、Tableau或ActiveUI中浏览多维数据集,您可以在图表和透视表中使用"StockVariation.SUM“度量。在MDX中,您可以创建计算度量值,以根据变量计算库存:
WITH
Member [Measures].[Stock] AS (
(
[Measures].[Stock],
[Year].CurrentMember.PrevMember
) + [Measures].[StockVariation.SUM]
)
SELECT
NON EMPTY Crossjoin(
[Year].[Year].[Year].Members,
{
[Measures].[Stock]
}
) ON COLUMNS,
NON EMPTY [Product].[Product].[Product].Members ON ROWS
FROM [_sparkube_1]

https://stackoverflow.com/questions/50293708
复制相似问题