Skip to content

Commit d4665fa

Browse files
anishshri-dbHeartSaVioR
authored andcommitted
[SPARK-49677][SS] Ensure that changelog files are written on commit and forceSnapshot flag is also reset
### What changes were proposed in this pull request? Ensure that changelog files are written on commit and forceSnapshot flag is also reset ### Why are the changes needed? Without these changes, we are not writing the changelog files per batch and we are also trying to upload full snapshot each time since the flag is not being reset correctly ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests Before: ``` [info] Run completed in 3 seconds, 438 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0 [info] *** 1 TEST FAILED *** ``` After: ``` [info] Run completed in 4 seconds, 155 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#48125 from anishshri-db/task/SPARK-49677. Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent a5ac80a commit d4665fa

File tree

2 files changed

+49
-8
lines changed
  • sql/core/src

2 files changed

+49
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -646,15 +646,15 @@ class RocksDB(
646646
// is enabled.
647647
if (shouldForceSnapshot.get()) {
648648
uploadSnapshot()
649+
shouldForceSnapshot.set(false)
650+
}
651+
652+
// ensure that changelog files are always written
653+
try {
654+
assert(changelogWriter.isDefined)
655+
changelogWriter.foreach(_.commit())
656+
} finally {
649657
changelogWriter = None
650-
changelogWriter.foreach(_.abort())
651-
} else {
652-
try {
653-
assert(changelogWriter.isDefined)
654-
changelogWriter.foreach(_.commit())
655-
} finally {
656-
changelogWriter = None
657-
}
658658
}
659659
} else {
660660
assert(changelogWriter.isEmpty)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,47 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
811811
}
812812
}
813813

814+
testWithChangelogCheckpointingEnabled("RocksDB: ensure that changelog files are written " +
815+
"and snapshots uploaded optionally with changelog format v2") {
816+
withTempDir { dir =>
817+
val remoteDir = Utils.createTempDir().toString
818+
val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = false)
819+
new File(remoteDir).delete() // to make sure that the directory gets created
820+
withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
821+
db.createColFamilyIfAbsent("test")
822+
db.load(0)
823+
db.put("a", "1")
824+
db.put("b", "2")
825+
db.commit()
826+
assert(changelogVersionsPresent(remoteDir) == Seq(1))
827+
assert(snapshotVersionsPresent(remoteDir) == Seq(1))
828+
829+
db.load(1)
830+
db.put("a", "3")
831+
db.put("c", "4")
832+
db.commit()
833+
834+
assert(changelogVersionsPresent(remoteDir) == Seq(1, 2))
835+
assert(snapshotVersionsPresent(remoteDir) == Seq(1))
836+
837+
db.removeColFamilyIfExists("test")
838+
db.load(2)
839+
db.remove("a")
840+
db.put("d", "5")
841+
db.commit()
842+
assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3))
843+
assert(snapshotVersionsPresent(remoteDir) == Seq(1, 3))
844+
845+
db.load(3)
846+
db.put("e", "6")
847+
db.remove("b")
848+
db.commit()
849+
assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3, 4))
850+
assert(snapshotVersionsPresent(remoteDir) == Seq(1, 3))
851+
}
852+
}
853+
}
854+
814855
test("RocksDB: ensure merge operation correctness") {
815856
withTempDir { dir =>
816857
val remoteDir = Utils.createTempDir().toString

0 commit comments

Comments
 (0)