◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。
从一个引人注目的标题“spark 为什么这么慢?”开始,值得注意的是,称 spark“慢”可能意味着多种含义。聚合速度慢吗?数据加载?存在不同的情况。此外,“spark”是一个广泛的术语,其性能取决于编程语言和使用上下文等因素。因此,在深入讨论之前,让我们将标题改进得更加精确。
由于我主要在 databricks 上使用 spark 和 python,因此我将进一步缩小范围。
优化后的标题将是:
“spark 的第一印象:‘听说它很快,但为什么感觉很慢?’初学者的视角”
作为广泛使用 pandas、numpy 和机器学习库的人,我钦佩 spark 通过并行和分布式处理处理大数据的能力。当我最终在工作中使用 spark 时,我对它看起来比 pandas 慢的场景感到困惑。不确定出了什么问题,我发现了一些见解并想与大家分享。
我们简单介绍一下spark的基本架构。
(集群模式概述)
spark 集群由执行实际处理的 工作节点和协调和计划执行的驱动程序节点组成。这种架构会影响下面讨论的所有内容,因此请记住这一点。
现在,进入要点。
spark 针对大规模数据处理进行了优化,但它也可以处理小型数据集。然而,看看这个基准:
(在单节点机器上对 apache spark 进行基准测试)
结果表明,对于 15gb 以下的数据集,pandas 在聚合任务中优于 spark。为什么?简而言之,spark 优化的开销超过了小数据集的好处。
该链接显示了 spark 并不慢的情况,但这些情况通常处于本地集群模式。对于独立设置,由于节点之间的网络通信开销,较小的数据集可能是一个缺点。
spark 采用惰性求值,这意味着转换不会立即执行,而是推迟到某个操作(例如收集、计数、显示)触发计算为止。
示例(熊猫):
df = spark.read.table("tpch.lineitem").limit(1000).topandas() df["l_tax_percentage"] = df["l_tax"] * 100 for l_orderkey, group_df in df.groupby("l_orderkey"): print(l_orderkey, group_df["l_tax_percentage"].mean())
执行时间:3.04秒
spark 中的等效项:
from pyspark.sql import functions as f sdf = spark.read.table("tpch.lineitem").limit(1000) sdf = sdf.withcolumn("l_tax_percentage", f.col("l_tax") * 100) for row in sdf.select("l_orderkey").distinct().collect(): grouped_sdf = sdf.filter(f.col("l_orderkey") == row.l_orderkey).groupby("l_orderkey").agg( f.mean("l_tax_percentage").alias("avg_l_tax_percentage") ) print(grouped_sdf.show())
执行时间:3分钟后仍在运行。
为什么?
spark 代码在 pandas 中有效地执行了此操作:
for l_orderkey, group_df in df.groupby("l_orderkey"): df["l_tax_percentage"] = df["l_tax"] * 100 print(l_orderkey, group_df["l_tax_percentage"].mean())
通过使用 spark 的缓存或重构逻辑以最大程度地减少重复计算来避免此类模式。
https://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations
随机播放 当数据在 workers 之间重新分配时发生,通常是在 groupbykey、join 或重新分区等操作期间。随机播放可能会很慢,原因是:
例如,拥有更多 worker 并不总能提高洗牌期间的性能。
您觉得这有帮助吗?如果有效使用,spark 是一个出色的工具。除了加速大规模数据处理之外,spark 还以其可扩展的资源管理而大放异彩,尤其是在云中。
尝试 spark 来优化您的数据运营和管理!
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。