Kafka Streams Persistent Store cleanup

By : nhuser
Date : November 28 2020, 04:01 AM

around this issue We were having a similar issue, we simply scheduled a job for cleaning the store in our processor/transformer. Just implement your isDataOld(nextValue) and you are good to go.
public void init(ProcessorContext context) {
this.kvStore = (KeyValueStore<Key, Value>) this.context.getStateStore("KV_STORE_NAME");
this.context.schedule(60000, PunctuationType.STREAM_TIME, (timestamp) -> {
    KeyValueIterator<Key, Value> iterator = kvStore.all();
    while (iterator.hasNext()){
    KeyValue<Key,Value> nextValue = iterator.next();
    if isDataOld(nextValue)


