Consumer class

class mdml_client.kafka_mdml_consumer(topics, group, auto_offset_reset='earliest', show_mdml_time=True, kafka_host='merf.egs.anl.gov', kafka_port=9092, schema_host='merf.egs.anl.gov', schema_port=8081)

Creates a consumer to consume messages from an MDML instance.

Parameters:
  • topics (list(str)) – Topics to consume from
  • group (str) – Consumer group ID. Messages are only consumed by a given group ID once.
  • auto_offset_reset (str) – ‘earliest’ or ‘latest’. ‘earliest’ is the default and will start consuming messages from where the consumer group left off. ‘latest’ will start consuming messages from the time that the consumer is started.
  • show_mdml_time (bool) – Indicator if the value of ‘mdml_time’ should be shown or suppressed
  • kafka_host (str) – Host name of the kafka broker
  • kafka_port (int) – Port used for the kafka broker
  • schema_host (str) – Host name of the kafka schema registry
  • schema_port (int) – Port of the kafka schema registry
close()

Closes down the consumer. Ensures that received messages have been acknowledged by Kafka.

consume(poll_timeout=1.0, overall_timeout=300.0, verbose=True)

Start consuming from the specified topic

Parameters:
  • poll_timeout (float) – Timeout to wait when consuming one message
  • overall_timeout (float) – Timeout to wait until the consume generator is closed down. This timeout is restarted every time a new message is received
  • verbose (bool) – Print a message with notes when the consume loop starts
Yields:

dict – A dictionary containing the topic and value of a single message

consume_chunks(poll_timeout=1.0, overall_timeout=300.0, save_file=True, save_dir='.', passthrough=True, verbose=True)

Consume messages from a topic that contains chunked messages. The original file is saved to disk by default.

Parameters:
  • poll_timeout (float) – Timeout for one message to reach the consumer
  • overall_timeout (float) – Time until the consumer will be shutdown if no messages are received
  • save_file (bool) – True if the chunked file should be saved. False will return the original data contained in the file
  • save_dir (str) – Directory to save files
  • passthrough (bool) – If multiple topics are subscribed to and one of them is not using chunking, passthrough=True will ensure those messages are still yielded by the generator
  • verbose (bool) – Print details regarding the consumer on start
Yields:
  • tuple – A tuple containing (timestamp, data) where timestamp is the time the first chunk of the message was sent and where data is either a filepath (save_file=True) or the bytes of the file that was chunked and streamed (save_file=False).
  • dict – If passthrough=True is used and a message from a topic without chunking is received, a dictionary containing the topic and value of the message will be yielded. Otherwise, a tuple is returned