Skip to content

Commit

Permalink
[FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.…
Browse files Browse the repository at this point in the history
…Builder.cleanupInRocksdbCompactFilter
  • Loading branch information
1996fanrui committed Mar 11, 2024
1 parent d6a4eb9 commit 7bfa1d3
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 23 deletions.
9 changes: 5 additions & 4 deletions docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000, Time.hours(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build();
```
{{< /tab >}}
Expand All @@ -548,18 +548,19 @@ import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000, Time.hours(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from pyflink.common import Duration
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \
.build()
```
{{< /tab >}}
Expand All @@ -573,7 +574,7 @@ RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一
定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目。
比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中。
该功能可以确保文件定期通过压缩过滤器压缩。
您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicCompactionTime)`
您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)`
方法设定定期压缩的时间。
定期压缩的时间的默认值是 30 天。
您可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。
Expand Down
9 changes: 5 additions & 4 deletions docs/content/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000, Time.hours(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build();
```
{{< /tab >}}
Expand All @@ -611,18 +611,19 @@ import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000, Time.hours(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from pyflink.common import Duration
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \
.build()
```
{{< /tab >}}
Expand All @@ -640,7 +641,7 @@ Periodic compaction could speed up expired state entries cleanup, especially for
Files older than this value will be picked up for compaction, and re-written to the same level as they were before.
It makes sure a file goes through compaction filters periodically.
You can change it and pass a custom value to
`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicCompactionTime)` method.
`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)` method.
The default value of Periodic compaction seconds is 30 days.
You could set it to 0 to turn off periodic compaction or set a small value to speed up expired state entries cleanup, but it
would trigger more compactions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) {
*/
@Nonnull
public Builder cleanupInRocksdbCompactFilter(
long queryTimeAfterNumEntries, Time periodicCompactionTime) {
long queryTimeAfterNumEntries, Duration periodicCompactionTime) {
strategies.put(
CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
new RocksdbCompactFilterCleanupStrategy(
Expand All @@ -354,7 +354,7 @@ public Builder cleanupInRocksdbCompactFilter(
*
* <p>If some specific cleanup is configured, e.g. {@link #cleanupIncrementally(int,
* boolean)} or {@link #cleanupInRocksdbCompactFilter(long)} or {@link
* #cleanupInRocksdbCompactFilter(long, Time)} , this setting does not disable it.
* #cleanupInRocksdbCompactFilter(long, Duration)} , this setting does not disable it.
*/
@Nonnull
public Builder disableCleanupInBackground() {
Expand Down Expand Up @@ -497,7 +497,7 @@ public static class RocksdbCompactFilterCleanupStrategy
* Default value is 30 days so that every file goes through the compaction process at least
* once every 30 days if not compacted sooner.
*/
static final Time DEFAULT_PERIODIC_COMPACTION_TIME = Time.days(30);
static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30);

static final RocksdbCompactFilterCleanupStrategy
DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY =
Expand All @@ -515,14 +515,14 @@ public static class RocksdbCompactFilterCleanupStrategy
* and re-written to the same level as they were before. It makes sure a file goes through
* compaction filters periodically. 0 means turning off periodic compaction.
*/
private final Time periodicCompactionTime;
private final Duration periodicCompactionTime;

private RocksdbCompactFilterCleanupStrategy(long queryTimeAfterNumEntries) {
this(queryTimeAfterNumEntries, DEFAULT_PERIODIC_COMPACTION_TIME);
}

private RocksdbCompactFilterCleanupStrategy(
long queryTimeAfterNumEntries, Time periodicCompactionTime) {
long queryTimeAfterNumEntries, Duration periodicCompactionTime) {
this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
this.periodicCompactionTime = periodicCompactionTime;
}
Expand All @@ -531,7 +531,7 @@ public long getQueryTimeAfterNumEntries() {
return queryTimeAfterNumEntries;
}

public Time getPeriodicCompactionTime() {
public Duration getPeriodicCompactionTime() {
return periodicCompactionTime;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -71,7 +72,8 @@ void testStateTtlConfigBuildWithCleanupInBackground() {
assertThat(incrementalCleanupStrategy.getCleanupSize()).isEqualTo(5);
assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord()).isFalse();
assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries()).isEqualTo(1000L);
assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime()).isEqualTo(Time.days(30));
assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Duration.ofDays(30));
}

@Test
Expand Down
8 changes: 4 additions & 4 deletions flink-python/pyflink/datastream/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from enum import Enum
from typing import TypeVar, Generic, Iterable, List, Iterator, Dict, Tuple, Optional

from pyflink.common.time import Time
from pyflink.common.time import Duration, Time
from pyflink.common.typeinfo import TypeInformation, Types

__all__ = [
Expand Down Expand Up @@ -809,7 +809,7 @@ def cleanup_incrementally(self,
def cleanup_in_rocksdb_compact_filter(
self,
query_time_after_num_entries,
periodic_compaction_time=Time.days(30)) -> \
periodic_compaction_time=Duration.of_days(30)) -> \
'StateTtlConfig.Builder':
"""
Cleanup expired state while Rocksdb compaction is running.
Expand Down Expand Up @@ -925,14 +925,14 @@ class RocksdbCompactFilterCleanupStrategy(CleanupStrategy):

def __init__(self,
query_time_after_num_entries: int,
periodic_compaction_time=Time.days(30)):
periodic_compaction_time=Duration.of_days(30)):
self._query_time_after_num_entries = query_time_after_num_entries
self._periodic_compaction_time = periodic_compaction_time

def get_query_time_after_num_entries(self) -> int:
return self._query_time_after_num_entries

def get_periodic_compaction_time(self) -> Time:
def get_periodic_compaction_time(self) -> Duration:
return self._periodic_compaction_time

EMPTY_STRATEGY = EmptyCleanupStrategy()
Expand Down
3 changes: 1 addition & 2 deletions flink-python/pyflink/fn_execution/embedded/java_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ def to_java_state_ttl_config(ttl_config: StateTtlConfig):

if rocksdb_compact_filter_cleanup_strategy:
j_ttl_config_builder.cleanupInRocksdbCompactFilter(
rocksdb_compact_filter_cleanup_strategy.get_query_time_after_num_entries(),
rocksdb_compact_filter_cleanup_strategy.get_periodic_compaction_time())
rocksdb_compact_filter_cleanup_strategy.get_query_time_after_num_entries())

return j_ttl_config_builder.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -118,6 +119,6 @@ void testParseStateTtlConfigFromProto() {
assertThat(rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries())
.isEqualTo(1000);
assertThat(rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Time.days(30));
.isEqualTo(Duration.ofDays(30));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void configCompactFilter(
columnFamilyOptionsMap.get(stateDesc.getName());
Preconditions.checkNotNull(columnFamilyOptions);
columnFamilyOptions.setPeriodicCompactionSeconds(
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().toSeconds());
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds());

long queryTimeAfterNumEntries =
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
Expand Down

0 comments on commit 7bfa1d3

Please sign in to comment.