Kafka-Flink-Stream processing: Is there a way to reload input files into the variables being used in a streaming process

Kafka-Flink-Stream processing: Is there a way to reload input files into the variables being used in a streaming process

Content Index :

Hope that helps Flink can monitor a directory and ingest files when they are moved into that directory; maybe that's what you are looking for. See the PROCESS_CONTINUOUSLY option for readfile in the documentation.
However, if the data is in Kafka, it would be much more natural to use Flink's Kafka consumer to stream the data directly into Flink. There is also documentation about using the Kafka connector. And the Flink training includes an exercise on using Kafka with Flink.

No Comments Right Now !

Boards Message :
You Must Login Or Sign Up to Add Your Comments .

Share : facebook icon twitter icon

Tag : apache-kafka , By : dbarbot
Date : November 22 2020, 11:00 PM
wish help you to fix your issue You understanding about micro-batch vs stream processing is correct. You are also right, that all three system use the standard Java consumer that is provided by Kafka to pull data for processing in an infinite loop.
The main difference is, that Spark needs to schedule a new job for each micro batch it processes. And this scheduling overhead in quite high, such that Spark cannot handle very low batch intervals like 100ms or 50ms efficiently and thus throughput goes down for those small batches.

Exception when processing data during Kafka stream process

Tag : apache-kafka , By : user143038
Date : March 29 2020, 07:55 AM
may help you . You have to specify the correct Serdes for the to() operation, too. Otherwise, it uses the default Serdes from the StreamsConfig and this ByteArraySerde -- and String cannot be cast to byte[].
You need to do:
.to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));

When using kafka+beam+flink. Found interface org.apache.flink.streaming.api.operators.InternalTimer, but class was expec

Tag : java , By : PPD
Date : March 29 2020, 07:55 AM
around this issue I had the same issue and I fixed it by checking if the version of flink is compatible with Beam:

Tag : java , By : Fernando
Date : November 24 2020, 05:48 AM
fixed the issue. Will look into that further You can implement your own custom KafkaDeserializationSchema, like this:
  public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;

    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));

    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);

input.process(new ProcessFunction<Tuple2<String,String>, String>() {
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                String topicName = value.f0;
                // your processing logic here.

Passing elements back to the input stream, after processing, in Flink?

Tag : java , By : mhedberg
Date : March 29 2020, 07:55 AM
help you fix your problem Here are two solutions. They are more-or-less equivalent in their underlying behavior, but you might find one or the other easier to understand, maintain, or test.
As for your question, no, there is no way to loop back (re-queue) the unconsumed events without pushing them back to Kinesis. But simply holding on to them until they are needed should be fine.
Related Posts Related QUESTIONS :
  • Move 32bit register into a 8 bit register
  • Is there a way to update, not overwrite, worker_env for a Dask YarnCluster within a script?
  • Lotus Notes Deployment
  • How Do I Add Active Directory To APIM Using Terraform?
  • How to get the old parameter values in Blazor OnParameterSet?
  • How to debug "ERROR: Could not reach the worker node."?
  • How chain indefinite amount of flatMap operators in Reactor?
  • extract dates and times from string in Redshift
  • How do I make a column of 3 cards match in height in bootstrapVue?
  • how to replace missing values from another column in PySpark?
  • only read last line of text file (C++ Builder)
  • Snakemake --forceall --dag results in mysterius Error: <stdin>: syntax error in line 1 near 'File' from Graphvis
  • How Can I Remove Demo Products From APIM Created With Terraform?
  • How to avoid cloning a big integer in rust
  • Break a row of words into word groups in Hive
  • How can I add a path variable to existing files in an Installshield project converted from MSI
  • Certain languages are not available in postman; is there a way to enable it?
  • Concatenation step of U-Net for unequal number of channels
  • HL Fabric - states, transactions but varied keys
  • How to handle "flood wait" errors when using telethon.sync?
  • Any way to make closure which takes a destructured array?
  • What is the Difference between @PeculiarVentures 's `webcrypto` and `node-webcrypto-ossl`?
  • DWG Sheet Combination failing on AutoDesk Forge
  • karate.log(args) on afterScenario hook is not embedded on surefire json file
  • How do I output latest distinct values of specific fields and all other colums?
  • Clarification on lit-element components and where to browse them
  • Will websockets over HTTP2 also be multiplexed in streams?
  • How to apply switch statement for multi columns in datatables
  • frobot framework - Usage outside testing
  • How do I build against the UCRT with mingw-w64?
  • How to use someClass.android.ts and someClass.ios.ts without errors
  • ADB Connection to Samsung smart tv
  • is there a way to 2 create multiple command files in cypress
  • Best way to filter DBpedia results and return a specific results using SPARQL
  • Is it possible to use unicode combining characters to combine arbitrary characters?
  • Antlr4 extremely simple grammar failing
  • Neighbor of 10 wrong answer?
  • PDFlib - setting stroke and fill opacity (transparency)
  • AWS Lambda + Serverless, where/how to deploy js module that couldn't be bundled?
  • how to place mobile call from PWA
  • How to get connected clients and client certificate in node-opcua server
  • Passing dictionary from one template to another in Helm
  • Kivy. Position of GridLayout inside ScrollView
  • How can I try to place a pending order every X minutes till it's successfull?
  • Is there a way to download the SonarLint report generated in Eclipse IDE?
  • How to Open Port in Windows Firewall using C++ Builder?
  • How to put "OR" operator in Karate API assertion statement
  • Get .model.json as String
  • Proof Process busy on combine_split
  • Does memoization work on smple .select with strings?
  • Check if movement ended
  • Determine If a String Is Present in a List or Map?
  • Shortest_Path Interpretation of Edge Weight
  • Azure Pipelines - What's the difference between a Pipeline artifact and a Build artifact?
  • How to save content of bilion websites found by search engine (how google is doing it)
  • dynamodb index does not return all data
  • Either scp or roles claim need to be present in the token using when application permissions to read sharepoint sites
  • how to speed up sympy-matrix of matrics calculation runtime
  • SNMP Walk and Get / GetNext for MIBs that are not supported by agent
  • Using Puppeteer, how to get Chrome DevTools' "Network" tab's timing information?
  • shadow
    Privacy Policy - Terms - Contact Us © scrbit.com