Consumer class¶
-
class
mdml_client.
kafka_mdml_consumer
(topics, group, auto_offset_reset='earliest', show_mdml_time=True, kafka_host='merf.egs.anl.gov', kafka_port=9092, schema_host='merf.egs.anl.gov', schema_port=8081)¶ Creates a consumer to consume messages from an MDML instance.
Parameters: - topics (list(str)) – Topics to consume from
- group (str) – Consumer group ID. Messages are only consumed by a given group ID once.
- auto_offset_reset (str) – ‘earliest’ or ‘latest’. ‘earliest’ is the default and will start consuming messages from where the consumer group left off. ‘latest’ will start consuming messages from the time that the consumer is started.
- show_mdml_time (bool) – Indicator if the value of ‘mdml_time’ should be shown or suppressed
- kafka_host (str) – Host name of the kafka broker
- kafka_port (int) – Port used for the kafka broker
- schema_host (str) – Host name of the kafka schema registry
- schema_port (int) – Port of the kafka schema registry
-
close
()¶ Closes down the consumer. Ensures that received messages have been acknowledged by Kafka.
-
consume
(poll_timeout=1.0, overall_timeout=300.0, verbose=True)¶ Start consuming from the specified topic
Parameters: - poll_timeout (float) – Timeout to wait when consuming one message
- overall_timeout (float) – Timeout to wait until the consume generator is closed down. This timeout is restarted every time a new message is received
- verbose (bool) – Print a message with notes when the consume loop starts
Yields: dict – A dictionary containing the topic and value of a single message
-
consume_chunks
(poll_timeout=1.0, overall_timeout=300.0, save_file=True, save_dir='.', passthrough=True, verbose=True)¶ Consume messages from a topic that contains chunked messages. The original file is saved to disk by default.
Parameters: - poll_timeout (float) – Timeout for one message to reach the consumer
- overall_timeout (float) – Time until the consumer will be shutdown if no messages are received
- save_file (bool) – True if the chunked file should be saved. False will return the original data contained in the file
- save_dir (str) – Directory to save files
- passthrough (bool) – If multiple topics are subscribed to and one of them is not using chunking, passthrough=True will ensure those messages are still yielded by the generator
- verbose (bool) – Print details regarding the consumer on start
Yields: - tuple – A tuple containing (timestamp, data) where timestamp is the time the first chunk of the message was sent and where data is either a filepath (save_file=True) or the bytes of the file that was chunked and streamed (save_file=False).
- dict – If passthrough=True is used and a message from a topic without chunking is received, a dictionary containing the topic and value of the message will be yielded. Otherwise, a tuple is returned