## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importuuidimportjsonfromtypingimportAny,Dict,List,Optional,TYPE_CHECKINGfromabcimportABC,abstractmethodfrompyspark.sqlimportRowfrompysparkimportcloudpickle__all__=["StreamingQueryListener"]ifTYPE_CHECKING:frompy4j.java_gatewayimportJavaObject
[docs]classStreamingQueryListener(ABC):""" Interface for listening to events related to :class:`~pyspark.sql.streaming.StreamingQuery`. .. versionadded:: 3.4.0 Notes ----- The methods are not thread-safe as they may be called from different threads. The events received are identical with Scala API. Refer to its documentation. This API is evolving. Examples -------- >>> class MyListener(StreamingQueryListener): ... def onQueryStarted(self, event: QueryStartedEvent) -> None: ... # Do something with event. ... pass ... ... def onQueryProgress(self, event: QueryProgressEvent) -> None: ... # Do something with event. ... pass ... ... def onQueryIdle(self, event: QueryIdleEvent) -> None: ... # Do something with event. ... pass ... ... def onQueryTerminated(self, event: QueryTerminatedEvent) -> None: ... # Do something with event. ... pass ... >>> spark.streams.addListener(MyListener()) """def_set_spark_session(self,spark:"SparkSession"# type: ignore[name-defined] # noqa: F821)->None:self._sparkSession=spark@propertydefspark(self)->Optional["SparkSession"]:# type: ignore[name-defined] # noqa: F821ifhasattr(self,"_sparkSession"):returnself._sparkSessionelse:returnNonedef_init_listener_id(self)->None:self._id=str(uuid.uuid4())@abstractmethoddefonQueryStarted(self,event:"QueryStartedEvent")->None:""" Called when a query is started. Notes ----- This is called synchronously with :py:meth:`~pyspark.sql.streaming.DataStreamWriter.start`, that is, `onQueryStart` will be called on all listeners before `DataStreamWriter.start()` returns the corresponding :class:`~pyspark.sql.streaming.StreamingQuery`. Please don't block this method as it will block your query. """pass@abstractmethoddefonQueryProgress(self,event:"QueryProgressEvent")->None:""" Called when there is some status update (ingestion rate updated, etc.) Notes ----- This method is asynchronous. The status in :class:`~pyspark.sql.streaming.StreamingQuery` will always be latest no matter when this method is called. Therefore, the status of :class:`~pyspark.sql.streaming.StreamingQuery`. may be changed before/when you process the event. E.g., you may find :class:`~pyspark.sql.streaming.StreamingQuery` is terminated when you are processing `QueryProgressEvent`. """pass# NOTE: Do not mark this as abstract method, since we released this abstract class without# this method in prior version and marking this as abstract method would break existing# implementations.defonQueryIdle(self,event:"QueryIdleEvent")->None:""" Called when the query is idle and waiting for new data to process. """pass@abstractmethoddefonQueryTerminated(self,event:"QueryTerminatedEvent")->None:""" Called when a query is stopped, with or without error. """pass@propertydef_jlistener(self)->"JavaObject":frompysparkimportSparkContextifhasattr(self,"_jlistenerobj"):returnself._jlistenerobjself._jlistenerobj:"JavaObject"=(SparkContext._jvm.PythonStreamingQueryListenerWrapper(# type: ignore[union-attr]JStreamingQueryListener(self)))returnself._jlistenerobj
classJStreamingQueryListener:""" Python class that implements Java interface by Py4J. """def__init__(self,pylistener:StreamingQueryListener)->None:self.pylistener=pylistenerdefonQueryStarted(self,jevent:"JavaObject")->None:self.pylistener.onQueryStarted(QueryStartedEvent.fromJObject(jevent))defonQueryProgress(self,jevent:"JavaObject")->None:self.pylistener.onQueryProgress(QueryProgressEvent.fromJObject(jevent))defonQueryIdle(self,jevent:"JavaObject")->None:self.pylistener.onQueryIdle(QueryIdleEvent.fromJObject(jevent))defonQueryTerminated(self,jevent:"JavaObject")->None:self.pylistener.onQueryTerminated(QueryTerminatedEvent.fromJObject(jevent))classJava:implements=["org.apache.spark.sql.streaming.PythonStreamingQueryListener"]classQueryStartedEvent:""" Event representing the start of a query. .. versionadded:: 3.4.0 Notes ----- This API is evolving. """def__init__(self,id:uuid.UUID,runId:uuid.UUID,name:Optional[str],timestamp:str)->None:self._id:uuid.UUID=idself._runId:uuid.UUID=runIdself._name:Optional[str]=nameself._timestamp:str=timestamp@classmethoddeffromJObject(cls,jevent:"JavaObject")->"QueryStartedEvent":returncls(id=uuid.UUID(jevent.id().toString()),runId=uuid.UUID(jevent.runId().toString()),name=jevent.name(),timestamp=jevent.timestamp(),)@classmethoddeffromJson(cls,j:Dict[str,Any])->"QueryStartedEvent":returncls(id=uuid.UUID(j["id"]),runId=uuid.UUID(j["runId"]),name=j["name"],timestamp=j["timestamp"],)@propertydefid(self)->uuid.UUID:""" A unique query id that persists across restarts. See py:meth:`~pyspark.sql.streaming.StreamingQuery.id`. """returnself._id@propertydefrunId(self)->uuid.UUID:""" A query id that is unique for every start/restart. See py:meth:`~pyspark.sql.streaming.StreamingQuery.runId`. """returnself._runId@propertydefname(self)->Optional[str]:""" User-specified name of the query, `None` if not specified. """returnself._name@propertydeftimestamp(self)->str:""" The timestamp to start a query. """returnself._timestampclassQueryProgressEvent:""" Event representing any progress updates in a query. .. versionadded:: 3.4.0 Notes ----- This API is evolving. """def__init__(self,progress:"StreamingQueryProgress")->None:self._progress:StreamingQueryProgress=progress@classmethoddeffromJObject(cls,jevent:"JavaObject")->"QueryProgressEvent":returncls(progress=StreamingQueryProgress.fromJObject(jevent.progress()))@classmethoddeffromJson(cls,j:Dict[str,Any])->"QueryProgressEvent":returncls(progress=StreamingQueryProgress.fromJson(j["progress"]))@propertydefprogress(self)->"StreamingQueryProgress":""" The query progress updates. """returnself._progressclassQueryIdleEvent:""" Event representing that query is idle and waiting for new data to process. .. versionadded:: 3.5.0 Notes ----- This API is evolving. """def__init__(self,id:uuid.UUID,runId:uuid.UUID,timestamp:str)->None:self._id:uuid.UUID=idself._runId:uuid.UUID=runIdself._timestamp:str=timestamp@classmethoddeffromJObject(cls,jevent:"JavaObject")->"QueryIdleEvent":returncls(id=uuid.UUID(jevent.id().toString()),runId=uuid.UUID(jevent.runId().toString()),timestamp=jevent.timestamp(),)@classmethoddeffromJson(cls,j:Dict[str,Any])->"QueryIdleEvent":returncls(id=uuid.UUID(j["id"]),runId=uuid.UUID(j["runId"]),timestamp=j["timestamp"])@propertydefid(self)->uuid.UUID:""" A unique query id that persists across restarts. See py:meth:`~pyspark.sql.streaming.StreamingQuery.id`. """returnself._id@propertydefrunId(self)->uuid.UUID:""" A query id that is unique for every start/restart. See py:meth:`~pyspark.sql.streaming.StreamingQuery.runId`. """returnself._runId@propertydeftimestamp(self)->str:""" The timestamp when the latest no-batch trigger happened. """returnself._timestampclassQueryTerminatedEvent:""" Event representing that termination of a query. .. versionadded:: 3.4.0 Notes ----- This API is evolving. """def__init__(self,id:uuid.UUID,runId:uuid.UUID,exception:Optional[str],errorClassOnException:Optional[str],)->None:self._id:uuid.UUID=idself._runId:uuid.UUID=runIdself._exception:Optional[str]=exceptionself._errorClassOnException:Optional[str]=errorClassOnException@classmethoddeffromJObject(cls,jevent:"JavaObject")->"QueryTerminatedEvent":jexception=jevent.exception()jerrorclass=jevent.errorClassOnException()returncls(id=uuid.UUID(jevent.id().toString()),runId=uuid.UUID(jevent.runId().toString()),exception=jexception.get()ifjexception.isDefined()elseNone,errorClassOnException=jerrorclass.get()ifjerrorclass.isDefined()elseNone,)@classmethoddeffromJson(cls,j:Dict[str,Any])->"QueryTerminatedEvent":returncls(id=uuid.UUID(j["id"]),runId=uuid.UUID(j["runId"]),exception=j["exception"],errorClassOnException=j["errorClassOnException"],)@propertydefid(self)->uuid.UUID:""" A unique query id that persists across restarts. See py:meth:`~pyspark.sql.streaming.StreamingQuery.id`. """returnself._id@propertydefrunId(self)->uuid.UUID:""" A query id that is unique for every start/restart. See py:meth:`~pyspark.sql.streaming.StreamingQuery.runId`. """returnself._runId@propertydefexception(self)->Optional[str]:""" The exception message of the query if the query was terminated with an exception. Otherwise, it will be `None`. """returnself._exception@propertydeferrorClassOnException(self)->Optional[str]:""" The error class from the exception if the query was terminated with an exception which is a part of error class framework. If the query was terminated without an exception, or the exception is not a part of error class framework, it will be `None`. .. versionadded:: 3.5.0 """returnself._errorClassOnExceptionclassStreamingQueryProgress:""" .. versionadded:: 3.4.0 Notes ----- This API is evolving. """def__init__(self,id:uuid.UUID,runId:uuid.UUID,name:Optional[str],timestamp:str,batchId:int,batchDuration:int,durationMs:Dict[str,int],eventTime:Dict[str,str],stateOperators:List["StateOperatorProgress"],sources:List["SourceProgress"],sink:"SinkProgress",numInputRows:int,inputRowsPerSecond:float,processedRowsPerSecond:float,observedMetrics:Dict[str,Row],jprogress:Optional["JavaObject"]=None,jdict:Optional[Dict[str,Any]]=None,):self._jprogress:Optional["JavaObject"]=jprogressself._jdict:Optional[Dict[str,Any]]=jdictself._id:uuid.UUID=idself._runId:uuid.UUID=runIdself._name:Optional[str]=nameself._timestamp:str=timestampself._batchId:int=batchIdself._batchDuration:int=batchDurationself._durationMs:Dict[str,int]=durationMsself._eventTime:Dict[str,str]=eventTimeself._stateOperators:List[StateOperatorProgress]=stateOperatorsself._sources:List[SourceProgress]=sourcesself._sink:SinkProgress=sinkself._numInputRows:int=numInputRowsself._inputRowsPerSecond:float=inputRowsPerSecondself._processedRowsPerSecond:float=processedRowsPerSecondself._observedMetrics:Dict[str,Row]=observedMetrics@classmethoddeffromJObject(cls,jprogress:"JavaObject")->"StreamingQueryProgress":frompysparkimportSparkContextreturncls(jprogress=jprogress,id=uuid.UUID(jprogress.id().toString()),runId=uuid.UUID(jprogress.runId().toString()),name=jprogress.name(),timestamp=jprogress.timestamp(),batchId=jprogress.batchId(),batchDuration=jprogress.batchDuration(),durationMs=dict(jprogress.durationMs()),eventTime=dict(jprogress.eventTime()),stateOperators=[StateOperatorProgress.fromJObject(js)forjsinjprogress.stateOperators()],sources=[SourceProgress.fromJObject(js)forjsinjprogress.sources()],sink=SinkProgress.fromJObject(jprogress.sink()),numInputRows=jprogress.numInputRows(),inputRowsPerSecond=jprogress.inputRowsPerSecond(),processedRowsPerSecond=jprogress.processedRowsPerSecond(),observedMetrics={k:cloudpickle.loads(SparkContext._jvm.PythonSQLUtils.toPyRow(jr)# type: ignore[union-attr])fork,jrindict(jprogress.observedMetrics()).items()},)@classmethoddeffromJson(cls,j:Dict[str,Any])->"StreamingQueryProgress":returncls(jdict=j,id=uuid.UUID(j["id"]),runId=uuid.UUID(j["runId"]),name=j["name"],timestamp=j["timestamp"],batchId=j["batchId"],batchDuration=j["batchDuration"],durationMs=dict(j["durationMs"])if"durationMs"injelse{},eventTime=dict(j["eventTime"])if"eventTime"injelse{},stateOperators=[StateOperatorProgress.fromJson(s)forsinj["stateOperators"]],sources=[SourceProgress.fromJson(s)forsinj["sources"]],sink=SinkProgress.fromJson(j["sink"]),numInputRows=j["numInputRows"],inputRowsPerSecond=j["inputRowsPerSecond"],processedRowsPerSecond=j["processedRowsPerSecond"],observedMetrics={k:Row(*row_dict.keys())(*row_dict.values())# Assume no nested rowsfork,row_dictinj["observedMetrics"].items()}if"observedMetrics"injelse{},)@propertydefid(self)->uuid.UUID:""" A unique query id that persists across restarts. See py:meth:`~pyspark.sql.streaming.StreamingQuery.id`. """returnself._id@propertydefrunId(self)->uuid.UUID:""" A query id that is unique for every start/restart. See py:meth:`~pyspark.sql.streaming.StreamingQuery.runId`. """returnself._runId@propertydefname(self)->Optional[str]:""" User-specified name of the query, `None` if not specified. """returnself._name@propertydeftimestamp(self)->str:""" The timestamp to start a query. """returnself._timestamp@propertydefbatchId(self)->int:""" A unique id for the current batch of data being processed. Note that in the case of retries after a failure a given batchId my be executed more than once. Similarly, when there is no data to be processed, the batchId will not be incremented. """returnself._batchId@propertydefbatchDuration(self)->int:""" The process duration of each batch. """returnself._batchDuration@propertydefdurationMs(self)->Dict[str,int]:""" The amount of time taken to perform various operations in milliseconds. """returnself._durationMs@propertydefeventTime(self)->Dict[str,str]:""" Statistics of event time seen in this batch. It may contain the following keys: .. code-block:: python { "max": "2016-12-05T20:54:20.827Z", # maximum event time seen in this trigger "min": "2016-12-05T20:54:20.827Z", # minimum event time seen in this trigger "avg": "2016-12-05T20:54:20.827Z", # average event time seen in this trigger "watermark": "2016-12-05T20:54:20.827Z" # watermark used in this trigger } All timestamps are in ISO8601 format, i.e. UTC timestamps. """returnself._eventTime@propertydefstateOperators(self)->List["StateOperatorProgress"]:""" Information about operators in the query that store state. """returnself._stateOperators@propertydefsources(self)->List["SourceProgress"]:""" detailed statistics on data being read from each of the streaming sources. """returnself._sources@propertydefsink(self)->"SinkProgress":""" A unique query id that persists across restarts. See py:meth:`~pyspark.sql.streaming.StreamingQuery.id`. """returnself._sink@propertydefobservedMetrics(self)->Dict[str,Row]:returnself._observedMetrics@propertydefnumInputRows(self)->int:""" The aggregate (across all sources) number of records processed in a trigger. """returnself._numInputRows@propertydefinputRowsPerSecond(self)->float:""" The aggregate (across all sources) rate of data arriving. """returnself._inputRowsPerSecond@propertydefprocessedRowsPerSecond(self)->float:""" The aggregate (across all sources) rate at which Spark is processing data. """returnself._processedRowsPerSecond@propertydefjson(self)->str:""" The compact JSON representation of this progress. """assertself._jdictisnotNoneorself._jprogressisnotNoneifself._jprogress:returnself._jprogress.json()else:returnjson.dumps(self._jdict)@propertydefprettyJson(self)->str:""" The pretty (i.e. indented) JSON representation of this progress. """assertself._jdictisnotNoneorself._jprogressisnotNoneifself._jprogress:returnself._jprogress.prettyJson()else:returnjson.dumps(self._jdict,indent=4)def__str__(self)->str:returnself.prettyJsonclassStateOperatorProgress:""" .. versionadded:: 3.4.0 Notes ----- This API is evolving. """def__init__(self,operatorName:str,numRowsTotal:int,numRowsUpdated:int,numRowsRemoved:int,allUpdatesTimeMs:int,allRemovalsTimeMs:int,commitTimeMs:int,memoryUsedBytes:int,numRowsDroppedByWatermark:int,numShufflePartitions:int,numStateStoreInstances:int,customMetrics:Dict[str,int],jprogress:Optional["JavaObject"]=None,jdict:Optional[Dict[str,Any]]=None,):self._jprogress:Optional["JavaObject"]=jprogressself._jdict:Optional[Dict[str,Any]]=jdictself._operatorName:str=operatorNameself._numRowsTotal:int=numRowsTotalself._numRowsUpdated:int=numRowsUpdatedself._numRowsRemoved:int=numRowsRemovedself._allUpdatesTimeMs:int=allUpdatesTimeMsself._allRemovalsTimeMs:int=allRemovalsTimeMsself._commitTimeMs:int=commitTimeMsself._memoryUsedBytes:int=memoryUsedBytesself._numRowsDroppedByWatermark:int=numRowsDroppedByWatermarkself._numShufflePartitions:int=numShufflePartitionsself._numStateStoreInstances:int=numStateStoreInstancesself._customMetrics:Dict[str,int]=customMetrics@classmethoddeffromJObject(cls,jprogress:"JavaObject")->"StateOperatorProgress":returncls(jprogress=jprogress,operatorName=jprogress.operatorName(),numRowsTotal=jprogress.numRowsTotal(),numRowsUpdated=jprogress.numRowsUpdated(),allUpdatesTimeMs=jprogress.allUpdatesTimeMs(),numRowsRemoved=jprogress.numRowsRemoved(),allRemovalsTimeMs=jprogress.allRemovalsTimeMs(),commitTimeMs=jprogress.commitTimeMs(),memoryUsedBytes=jprogress.memoryUsedBytes(),numRowsDroppedByWatermark=jprogress.numRowsDroppedByWatermark(),numShufflePartitions=jprogress.numShufflePartitions(),numStateStoreInstances=jprogress.numStateStoreInstances(),customMetrics=dict(jprogress.customMetrics()),)@classmethoddeffromJson(cls,j:Dict[str,Any])->"StateOperatorProgress":returncls(jdict=j,operatorName=j["operatorName"],numRowsTotal=j["numRowsTotal"],numRowsUpdated=j["numRowsUpdated"],numRowsRemoved=j["numRowsRemoved"],allUpdatesTimeMs=j["allUpdatesTimeMs"],allRemovalsTimeMs=j["allRemovalsTimeMs"],commitTimeMs=j["commitTimeMs"],memoryUsedBytes=j["memoryUsedBytes"],numRowsDroppedByWatermark=j["numRowsDroppedByWatermark"],numShufflePartitions=j["numShufflePartitions"],numStateStoreInstances=j["numStateStoreInstances"],customMetrics=dict(j["customMetrics"])if"customMetrics"injelse{},)@propertydefoperatorName(self)->str:returnself._operatorName@propertydefnumRowsTotal(self)->int:returnself._numRowsTotal@propertydefnumRowsUpdated(self)->int:returnself._numRowsUpdated@propertydefallUpdatesTimeMs(self)->int:returnself._allUpdatesTimeMs@propertydefnumRowsRemoved(self)->int:returnself._numRowsRemoved@propertydefallRemovalsTimeMs(self)->int:returnself._allRemovalsTimeMs@propertydefcommitTimeMs(self)->int:returnself._commitTimeMs@propertydefmemoryUsedBytes(self)->int:returnself._memoryUsedBytes@propertydefnumRowsDroppedByWatermark(self)->int:returnself._numRowsDroppedByWatermark@propertydefnumShufflePartitions(self)->int:returnself._numShufflePartitions@propertydefnumStateStoreInstances(self)->int:returnself._numStateStoreInstances@propertydefcustomMetrics(self)->Dict[str,int]:returnself._customMetrics@propertydefjson(self)->str:""" The compact JSON representation of this progress. """assertself._jdictisnotNoneorself._jprogressisnotNoneifself._jprogress:returnself._jprogress.json()else:returnjson.dumps(self._jdict)@propertydefprettyJson(self)->str:""" The pretty (i.e. indented) JSON representation of this progress. """assertself._jdictisnotNoneorself._jprogressisnotNoneifself._jprogress:returnself._jprogress.prettyJson()else:returnjson.dumps(self._jdict,indent=4)def__str__(self)->str:returnself.prettyJsonclassSourceProgress:""" .. versionadded:: 3.4.0 Notes ----- This API is evolving. """def__init__(self,description:str,startOffset:str,endOffset:str,latestOffset:str,numInputRows:int,inputRowsPerSecond:float,processedRowsPerSecond:float,metrics:Dict[str,str],jprogress:Optional["JavaObject"]=None,jdict:Optional[Dict[str,Any]]=None,)->None:self._jprogress:Optional["JavaObject"]=jprogressself._jdict:Optional[Dict[str,Any]]=jdictself._description:str=descriptionself._startOffset:str=startOffsetself._endOffset:str=endOffsetself._latestOffset:str=latestOffsetself._numInputRows:int=numInputRowsself._inputRowsPerSecond:float=inputRowsPerSecondself._processedRowsPerSecond:float=processedRowsPerSecondself._metrics:Dict[str,str]=metrics@classmethoddeffromJObject(cls,jprogress:"JavaObject")->"SourceProgress":returncls(jprogress=jprogress,description=jprogress.description(),startOffset=str(jprogress.startOffset()),endOffset=str(jprogress.endOffset()),latestOffset=str(jprogress.latestOffset()),numInputRows=jprogress.numInputRows(),inputRowsPerSecond=jprogress.inputRowsPerSecond(),processedRowsPerSecond=jprogress.processedRowsPerSecond(),metrics=dict(jprogress.metrics()),)@classmethoddeffromJson(cls,j:Dict[str,Any])->"SourceProgress":returncls(jdict=j,description=j["description"],startOffset=str(j["startOffset"]),endOffset=str(j["endOffset"]),latestOffset=str(j["latestOffset"]),numInputRows=j["numInputRows"],inputRowsPerSecond=j["inputRowsPerSecond"],processedRowsPerSecond=j["processedRowsPerSecond"],metrics=dict(j["metrics"])if"metrics"injelse{},)@propertydefdescription(self)->str:""" Description of the source. """returnself._description@propertydefstartOffset(self)->str:""" The starting offset for data being read. """returnself._startOffset@propertydefendOffset(self)->str:""" The ending offset for data being read. """returnself._endOffset@propertydeflatestOffset(self)->str:""" The latest offset from this source. """returnself._latestOffset@propertydefnumInputRows(self)->int:""" The number of records read from this source. """returnself._numInputRows@propertydefinputRowsPerSecond(self)->float:""" The rate at which data is arriving from this source. """returnself._inputRowsPerSecond@propertydefprocessedRowsPerSecond(self)->float:""" The rate at which data from this source is being processed by Spark. """returnself._processedRowsPerSecond@propertydefmetrics(self)->Dict[str,str]:returnself._metrics@propertydefjson(self)->str:""" The compact JSON representation of this progress. """assertself._jdictisnotNoneorself._jprogressisnotNoneifself._jprogress:returnself._jprogress.json()else:returnjson.dumps(self._jdict)@propertydefprettyJson(self)->str:""" The pretty (i.e. indented) JSON representation of this progress. """assertself._jdictisnotNoneorself._jprogressisnotNoneifself._jprogress:returnself._jprogress.prettyJson()else:returnjson.dumps(self._jdict,indent=4)def__str__(self)->str:returnself.prettyJsonclassSinkProgress:""" .. versionadded:: 3.4.0 Notes ----- This API is evolving. """def__init__(self,description:str,numOutputRows:int,metrics:Dict[str,str],jprogress:Optional["JavaObject"]=None,jdict:Optional[Dict[str,Any]]=None,)->None:self._jprogress:Optional["JavaObject"]=jprogressself._jdict:Optional[Dict[str,Any]]=jdictself._description:str=descriptionself._numOutputRows:int=numOutputRowsself._metrics:Dict[str,str]=metrics@classmethoddeffromJObject(cls,jprogress:"JavaObject")->"SinkProgress":returncls(jprogress=jprogress,description=jprogress.description(),numOutputRows=jprogress.numOutputRows(),metrics=dict(jprogress.metrics()),)@classmethoddeffromJson(cls,j:Dict[str,Any])->"SinkProgress":returncls(jdict=j,description=j["description"],numOutputRows=j["numOutputRows"],metrics=dict(j["metrics"])if"metrics"injelse{},)@propertydefdescription(self)->str:""" Description of the source. """returnself._description@propertydefnumOutputRows(self)->int:""" Number of rows written to the sink or -1 for Continuous Mode (temporarily) or Sink V1 (until decommissioned). """returnself._numOutputRows@propertydefmetrics(self)->Dict[str,str]:returnself._metrics@propertydefjson(self)->str:""" The compact JSON representation of this progress. """assertself._jdictisnotNoneorself._jprogressisnotNoneifself._jprogress:returnself._jprogress.json()else:returnjson.dumps(self._jdict)@propertydefprettyJson(self)->str:""" The pretty (i.e. indented) JSON representation of this progress. """assertself._jdictisnotNoneorself._jprogressisnotNoneifself._jprogress:returnself._jprogress.prettyJson()else:returnjson.dumps(self._jdict,indent=4)def__str__(self)->str:returnself.prettyJsondef_test()->None:importsysimportdoctestimportosfrompyspark.sqlimportSparkSessionimportpyspark.sql.streaming.listenerfrompy4j.protocolimportPy4JErroros.chdir(os.environ["SPARK_HOME"])globs=pyspark.sql.streaming.listener.__dict__.copy()try:spark=SparkSession._getActiveSessionOrCreate()exceptPy4JError:# noqa: F821spark=SparkSession(sc)# type: ignore[name-defined] # noqa: F821globs["spark"]=spark(failure_count,test_count)=doctest.testmod(pyspark.sql.streaming.listener,globs=globs,)globs["spark"].stop()iffailure_count:sys.exit(-1)if__name__=="__main__":_test()