最近隔壁的小伙伴跑模型出现了类似以下的错误,当然最主要的错误在下面的红色标识出来
client token: N/A diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 38, hbase10): java.lang.AssertionError: assertion failed: lapack.dpotrs returned 4. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:40) at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:439) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$m |
问题的来源定位在spark源码中的位置
private def checkReturnValue(info: intW, method: String): Unit = { info.`val` match { case code if code < 0 => throw new IllegalStateException(s"LAPACK.$method returned $code; arg ${-code} is illegal") case code if code > 0 => throw new SingularMatrixException ( s"LAPACK.$method returned $code because A is not positive definite. Is A derived from " + "a singular matrix (e.g. collinear column values)?") case _ => // do nothing } }
运行时代码的环境还是spark1.6.上面代码解析都是在2.1.1中进行的,相关接口基本上差异不是太大,核心代码如下
def train[ID: ClassTag]( // scalastyle:ignore ratings: RDD[Rating[ID]], rank: Int = 10, numUserBlocks: Int = 10, numItemBlocks: Int = 10, maxIter: Int = 10, regParam: Double = 0.1, implicitPrefs: Boolean = false, alpha: Double = 1.0, nonnegative: Boolean = false, intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, checkpointInterval: Int = 10, seed: Long = 0L)( implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { require(!ratings.isEmpty(), s"No ratings available from $ratings") require(intermediateRDDStorageLevel != StorageLevel.NONE, "ALS is not designed to run without persisting intermediate RDDs.") val sc = ratings.sparkContext val userPart = new ALSPartitioner(numUserBlocks) val itemPart = new ALSPartitioner(numItemBlocks) val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions) val solver = if (nonnegative) new NNLSSolver else new CholeskySolver val blockRatings = partitionRatings(ratings, userPart, itemPart) .persist(intermediateRDDStorageLevel) val (userInBlocks, userOutBlocks) = makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel) // materialize blockRatings and user blocks userOutBlocks.count() val swappedBlockRatings = blockRatings.map { case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) => ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } val (itemInBlocks, itemOutBlocks) = makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel) // materialize item blocks itemOutBlocks.count() val seedGen = new XORShiftRandom(seed) var userFactors = initialize(userInBlocks, rank, seedGen.nextLong()) var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong()) var previousCheckpointFile: Option[String] = None val shouldCheckpoint: Int => Boolean = (iter) => sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0) val deletePreviousCheckpointFile: () => Unit = () => previousCheckpointFile.foreach { file => try { val checkpointFile = new Path(file) checkpointFile.getFileSystem(sc.hadoopConfiguration).delete(checkpointFile, true) } catch { case e: IOException => logWarning(s"Cannot delete checkpoint file $file:", e) } } if (implicitPrefs) { for (iter <- 1 to maxIter) { userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel) val previousItemFactors = itemFactors itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, userLocalIndexEncoder, implicitPrefs, alpha, solver) previousItemFactors.unpersist() itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel) // TODO: Generalize PeriodicGraphCheckpointer and use it here. val deps = itemFactors.dependencies if (shouldCheckpoint(iter)) { itemFactors.checkpoint() // itemFactors gets materialized in computeFactors } val previousUserFactors = userFactors userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam, itemLocalIndexEncoder, implicitPrefs, alpha, solver) if (shouldCheckpoint(iter)) { ALS.cleanShuffleDependencies(sc, deps) deletePreviousCheckpointFile() previousCheckpointFile = itemFactors.getCheckpointFile } previousUserFactors.unpersist() } } else { for (iter <- 0 until maxIter) { itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, userLocalIndexEncoder, solver = solver) if (shouldCheckpoint(iter)) { val deps = itemFactors.dependencies itemFactors.checkpoint() itemFactors.count() // checkpoint item factors and cut lineage ALS.cleanShuffleDependencies(sc, deps) deletePreviousCheckpointFile() previousCheckpointFile = itemFactors.getCheckpointFile } userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam, itemLocalIndexEncoder, solver = solver) } } val userIdAndFactors = userInBlocks .mapValues(_.srcIds) .join(userFactors) .mapPartitions({ items => items.flatMap { case (_, (ids, factors)) => ids.view.zip(factors) } // Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks // and userFactors. }, preservesPartitioning = true) .setName("userFactors") .persist(finalRDDStorageLevel) val itemIdAndFactors = itemInBlocks .mapValues(_.srcIds) .join(itemFactors) .mapPartitions({ items => items.flatMap { case (_, (ids, factors)) => ids.view.zip(factors) } }, preservesPartitioning = true) .setName("itemFactors") .persist(finalRDDStorageLevel) if (finalRDDStorageLevel != StorageLevel.NONE) { userIdAndFactors.count() itemFactors.unpersist() itemIdAndFactors.count() userInBlocks.unpersist() userOutBlocks.unpersist() itemInBlocks.unpersist() itemOutBlocks.unpersist() blockRatings.unpersist() } (userIdAndFactors, itemIdAndFactors) }
关于上述代码的解析可以推荐一个博客写的非常好,对博主阅读源码找问题时提供了很大的帮助,als源码解析,在这里最主要在
val previousItemFactors = itemFactors itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, userLocalIndexEncoder, implicitPrefs, alpha, solver)
private def computeFactors[ID]( srcFactorBlocks: RDD[(Int, FactorBlock)], srcOutBlocks: RDD[(Int, OutBlock)], dstInBlocks: RDD[(Int, InBlock[ID])], rank: Int, regParam: Double, srcEncoder: LocalIndexEncoder, implicitPrefs: Boolean = false, alpha: Double = 1.0, solver: LeastSquaresNESolver): RDD[(Int, FactorBlock)] = { val numSrcBlocks = srcFactorBlocks.partitions.length //涉及隐式与显式,默认都是显式 val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None val srcOut = srcOutBlocks.join(srcFactorBlocks).flatMap { case (srcBlockId, (srcOutBlock, srcFactors)) => srcOutBlock.view.zipWithIndex.map { case (activeIndices, dstBlockId) => (dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx)))) } } val merged = srcOut.groupByKey(new ALSPartitioner(dstInBlocks.partitions.length)) dstInBlocks.join(merged).mapValues { case (InBlock(dstIds, srcPtrs, srcEncodedIndices, ratings), srcFactors) => val sortedSrcFactors = new Array[FactorBlock](numSrcBlocks) srcFactors.foreach { case (srcBlockId, factors) => sortedSrcFactors(srcBlockId) = factors } val dstFactors = new Array[Array[Float]](dstIds.length) var j = 0 val ls = new NormalEquation(rank) //以计算商品的特征矩阵为例,这个就是遍历每一个商品 while (j < dstIds.length) { ls.reset() if (implicitPrefs) { ls.merge(YtY.get) } var i = srcPtrs(j) var numExplicits = 0 //此处遍历每一个商品相关联的用户 while (i < srcPtrs(j + 1)) { //加密计算索引 val encoded = srcEncodedIndices(i) //根据索引计算得到分区ID val blockId = srcEncoder.blockId(encoded) val localIndex = srcEncoder.localIndex(encoded) val srcFactor = sortedSrcFactors(blockId)(localIndex) val rating = ratings(i) if (implicitPrefs) { // Extension to the original paper to handle rating < 0. confidence is a function // of |rating| instead so that it is never negative. c1 is confidence - 1. val c1 = alpha * math.abs(rating) // For rating <= 0, the corresponding preference is 0. So the second argument of add // is only there for rating > 0. if (rating > 0.0) { numExplicits += 1 } ls.add(srcFactor, if (rating > 0.0) 1.0 + c1 else 0.0, c1) } else { ls.add(srcFactor, rating) numExplicits += 1 } i += 1 } // Weight lambda by the number of explicit ratings based on the ALS-WR paper. //此处执行wls求解,使用的是cholesky分解方法 dstFactors(j) = solver.solve(ls, numExplicits * regParam) j += 1 } dstFactors } }
下面贴出cholesky代码
private[recommendation] class CholeskySolver extends LeastSquaresNESolver { /** * Solves a least squares problem with L2 regularization: * * min norm(A x - b)^2^ + lambda * norm(x)^2^ * * @param ne a [[NormalEquation]] instance that contains AtA, Atb, and n (number of instances) * @param lambda regularization constant * @return the solution x */ override def solve(ne: NormalEquation, lambda: Double): Array[Float] = { val k = ne.k // Add scaled lambda to the diagonals of AtA. var i = 0 var j = 2 while (i < ne.triK) { ne.ata(i) += lambda i += j j += 1 } //分解方法使用的是lapack打包的方法,spark这边这是负责封装api CholeskyDecomposition.solve(ne.ata, ne.atb) val x = new Array[Float](k) i = 0 while (i < k) { x(i) = ne.atb(i).toFloat i += 1 } ne.reset() x } }
choklesky计算的原理可以参考这个文章:cholesky分解
该线性回归问题时,对左边ata矩阵进行cholesky分解出现错误,原因在于矩阵的对称矩阵中的数据太小,cholesky分解的前提是矩阵要是正定矩阵,如果不满足条件就会出现分解异常中断程序,因此在评分的构造时建议将评分数据最小值适当定义,建议从1开始定义数据 ps:从网上查到的资料显示如果存在多列数据方差为0,及时数据不是很小也会出现上面描述的情况,出现这个现象就是矩阵A并不是列满秩