Skip to main content
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Calling all Data Engineers! Fabric Data Engineer (Exam DP-700) live sessions are back! Starting October 16th. Sign up.

Reply
Anonymous
Not applicable

Structured Streaming for multiple queries are not picking the next batch after processing the first

I m streaming multiple queries in a single spark job where it takes parquet files from source folder and then upsert its to delta table in lakehouse in the foreachbatch function .It worked fine and all streams are active and running without any error .But its not picking up the next batch even if new files get added to source.What could be the reason for this.

 

 

for row in control_df.collect():
                table = os.path.basename(row.TargetFiles)
                source_path = f"{self.data_lake_container}/{row.SourceFiles}"
                schema= self.infer_schema_from_file(source_path,"parquet")
                # print(schema)
                df_read = spark.readStream \
                    .schema(schema) \
                    .format("parquet")\
                    .option("recursiveFileLookup","true")\
                    .load(source_path)
                    # Write streaming data to destination
                query = df_read.writeStream.foreachBatch(self.streamwriter).trigger(processingTime="60                                 seconds").queryName(table).option("checkpointLocation",f"{self.data_lake_container}/{self.checkpoint_path}/{table}").start()
            for q in spark.streams.active:
                stream_count+=1
                print(f"ID: {q.id},\nName: {q.name},\nStatus: {q.status},\nLast Progress: {q.lastProgress}\n\n")
            print(stream_count)
            self.logger.info(f"Created streams for {stream_count} tables in {self.target_lakehouse} lakehouse")
            query.awaitTermination()
2 REPLIES 2
Anonymous
Not applicable

Hi @Anonymous ,

 

I have the following two suggestions:

 

In your code I observe that the processingTime trigger is set to 60 seconds. If new files are added after the trigger interval, they may not be captured immediately. Please try increasing the time interval.

 

Make sure that new files added to the source directory have unique names and are not overwritten. If the file name is reused, Spark may not detect the change.

 

If none of this solves your problem, as an alternative, you can recreate a spark job that adds all files (including the new ones) to the source at once.

 

Best Regards,
Yang
Community Support Team

 

If there is any post helps, then please consider Accept it as the solution  to help the other members find it more quickly.
If I misunderstand your needs or you still have problems on it, please feel free to let us know. Thanks a lot!

Anonymous
Not applicable

Hi @Anonymous ,

 

Is my follow-up just to ask if the problem has been solved?

 

If so, can you accept the correct answer as a solution or share your solution to help other members find it faster?

 

Thank you very much for your cooperation!

 

Best Regards,
Yang
Community Support Team

 

If there is any post helps, then please consider Accept it as the solution  to help the other members find it more quickly.
If I misunderstand your needs or you still have problems on it, please feel free to let us know. Thanks a lot!

Helpful resources

Announcements
Users online (4,086)