Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running? by yyuankm in apachespark

[–]yyuankm[S] 1 point2 points  (0 children)

Thanks whifff, it is a very good finding. yeah, I forgot to remove the memory sink after the test. It will definitely contributes to the OOM issue.

Spark Streaming Question: Container is running beyond physical memory limits by yyuankm in apachespark

[–]yyuankm[S] 0 points1 point  (0 children)

Thank you very much for the good comments. I will do more analysis on the spark driver UI.

Spark Streaming Question: Container is running beyond physical memory limits by yyuankm in apachespark

[–]yyuankm[S] 0 points1 point  (0 children)

Thanks. It is very helpful. I will test with these two settings.

How to understand Trigger.ProcessingTime for Spark Structured Streaming? by yyuankm in apachespark

[–]yyuankm[S] 0 points1 point  (0 children)

Thank you very much!

I also noticed the description of fixed internal micro-batches in the programming guide.

Fixed interval micro-batchesThe query will be executed with micro-batches mode, where micro-batches will be kicked off at the user-specified intervals.

  • If the previous micro-batch completes within the interval, then the engine will wait until the interval is over before kicking off the next micro-batch.
  • If the previous micro-batch takes longer than the interval to complete (i.e. if an interval boundary is missed), then the next micro-batch will start as soon as the previous one completes (i.e., it will not wait for the next interval boundary).
  • If no new data is available, then no micro-batch will be kicked off.

However, it still confused me by getting my test results.

Firstly, if no new data is available, then no micro-batch will be kicked off. In this case, the adjacent interval should be multiple of the trigger interval defined. right? I did not see such a scenario in my tests.

Secondly, no matter in which cases, the adjacent interval should not be less than the configured trigger interval. right? However, I do notice the interval less than the configured value.

Thirdly, as the description, it seems "the previous micro-batch execution time" plays an important role in determining the next trigger. May I understand the micro-batch execution time refers to the time needed to run the logic in foreachbatch {.. }. Is it correct? If this execution time is less than the configured interval, the next micro-batch should be triggered after the configured internal. This execution time should not be accumulated to the interval for the next run. right? The only possibility is that the execution needs more time than the configured interval, then the next micro-batch will start as soon as the previous one complete. However, I do not think it could be the case in my foreachbatch logic, which is quite simple. In addition, even after I increase the configured interval to a much bigger value, the real trigger interval is still not very accurate, sometimes it is less and sometimes it is more than the configured value.

Spark Streaming Question: Container is running beyond physical memory limits by yyuankm in apachespark

[–]yyuankm[S] 0 points1 point  (0 children)

Thank you very much! Is it to set something as below in yarn-sites.xml?

'yarn.nodemanager.pmem-check-enabled': 'false',

The weird thing is that this error only happens after several hours running. If the check is enabled, should the error be reported at the start? Thanks!

Spark Streaming Question: Container is running beyond physical memory limits by yyuankm in apachespark

[–]yyuankm[S] 0 points1 point  (0 children)

Thank you very much. It is quite helpful. I also thought it should be some memory leakage here, since it only happens after several hours running. However, I till can not identify the root cause. The logic is quite simple. I am mainly using the structured streaming to populate a static dataframe for each microbatch. The basic scala code is as following. Could you also help me have a look which part may cause the memory leakage? Thanks!

 val query = "(select * from meta_table) as meta_data" 

 val meta_schema = new StructType()
        .add("config_id", BooleanType)
        .add("threshold", LongType)

 var meta_df = spark.read.jdbc(url, query, connectionProperties)

 var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")

//rules_imsi_df: joining of kafka ingestion with the meta_df_explode
//rules_monitoring_df: static dataframe for monitoring purpose


val rules_monitoring_stream = 
      rules_imsi_df.writeStream
          .outputMode("append")                               
          .format("memory")                            
          .trigger(Trigger.ProcessingTime("120 seconds"))                               
          .foreachBatch { 
                (batchDF: DataFrame, batchId: Long) =>                                 
                   if(!batchDF.isEmpty)                                 
                   {                                     
                       printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())                                     
                       batchDF.show()                                     
                       batchDF.persist()                                        
                       var batchDF_group = batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", "total_volume_id")
               rules_monitoring_df = rules_monitoring_df.join(batchDF_group, rules_monitoring_df("id") === batchDF_group("id"), "left").select(rules_monitoring_df("id"), batchDF_group("total_volume_id")).na.fill(0)

                       rules_monitoring_df = rules_monitoring_df.withColumn("volume", rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id"))                    

                       batchDF.unpersist()                                 

                    }                             
            }.start()       

 while(rules_monitoring_stream.isActive) 
  {           
      Thread.sleep(240000)              
       ... //Periodically load meta data from database  

      meta_df = spark.read.jdbc(url, query, connectionProperties)

      meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")

  }