Schema-less Producer & Consumer classes

class mdml_client.kafka_mdml_producer_schemaless(topic, config=None, kafka_host='merf.egs.anl.gov', kafka_port=9092)

Creates a schemaless Producer instance for interacting with the MDML.

Parameters:
  • topic (str) – Topic to send under
  • config (dict) – Confluent Kafka client config
  • kafka_host (str) – Host name of the kafka broker
  • kafka_port (int) – Port used for the Kafka broker
flush()

Flush (send) any messages currently waiting in the producer.

produce(data, key=None, partition=None)

Produce data to the supplied topic

Parameters:
  • data (dict) – Dictionary of the data
  • key (string) – Key of the message (used in determining a partition) - not required
  • partition (int) – Partition used to save the message - not required
class mdml_client.kafka_mdml_consumer_schemaless(topics, group, kafka_host='merf.egs.anl.gov', kafka_port=9092)

Creates a serializingProducer instance for interacting with the MDML.

Parameters:
  • topics (list(str)) – Topics to consume from
  • group (str) – Consumer group ID. Messages are only consumed by a given group ID once.
  • kafka_host (str) – Host name of the kafka broker
  • kafka_port (int) – Port used for the kafka broker
close()

Closes down the consumer. Ensures that received messages have been acknowledged by Kafka.

consume(poll_timeout=1.0, overall_timeout=300.0, verbose=True)
Yields:dict – A dictionary containing the topic and value of a single message