MDML Python Client documentation¶
This client connects users to the Manufacturing Data and Machine Learning Platform at Argonne National Laboratory.
Producer class¶
-
class
mdml_client.
kafka_mdml_producer
(topic, schema=None, config=None, add_time=True, kafka_host='merf.egs.anl.gov', kafka_port=9092, schema_host='merf.egs.anl.gov', schema_port=8081)¶ Creates a producer instance for producing data to an MDML instance.
Parameters: - topic (str) – Topic to send under
- schema (dict or str) – JSON schema for the message value. If dict, value is used as the schema. If string, value is used as a file path to a json file.
- config (dict) – Confluent Kafka client config (only recommended for advanced usage - overwrites other parameters)
- add_time (bool) – If True, adds a value named ‘mdml_time’ to the data object that represents when the producer sent the message
- kafka_host (str) – Host name of the kafka broker
- kafka_port (int) – Port used for the kafka broker
- schema_host (str) – Host name of the kafka schema registry
- schema_port (int) – Port of the kafka schema registry
-
flush
()¶ Flush (send) any messages currently waiting in the producer.
-
produce
(data, key=None, partition=None)¶ Produce data to the supplied topic
Parameters: - data (dict) – Dictionary of the data
- key (str) – String for the Kafka assignor to use to calculate a partition
- partition (int) – Number of the partition to assign the message to
Consumer class¶
-
class
mdml_client.
kafka_mdml_consumer
(topics, group, auto_offset_reset='earliest', show_mdml_time=True, kafka_host='merf.egs.anl.gov', kafka_port=9092, schema_host='merf.egs.anl.gov', schema_port=8081)¶ Creates a consumer to consume messages from an MDML instance.
Parameters: - topics (list(str)) – Topics to consume from
- group (str) – Consumer group ID. Messages are only consumed by a given group ID once.
- auto_offset_reset (str) – ‘earliest’ or ‘latest’. ‘earliest’ is the default and will start consuming messages from where the consumer group left off. ‘latest’ will start consuming messages from the time that the consumer is started.
- show_mdml_time (bool) – Indicator if the value of ‘mdml_time’ should be shown or suppressed
- kafka_host (str) – Host name of the kafka broker
- kafka_port (int) – Port used for the kafka broker
- schema_host (str) – Host name of the kafka schema registry
- schema_port (int) – Port of the kafka schema registry
-
close
()¶ Closes down the consumer. Ensures that received messages have been acknowledged by Kafka.
-
consume
(poll_timeout=1.0, overall_timeout=300.0, verbose=True)¶ Start consuming from the specified topic
Parameters: - poll_timeout (float) – Timeout to wait when consuming one message
- overall_timeout (float) – Timeout to wait until the consume generator is closed down. This timeout is restarted every time a new message is received
- verbose (bool) – Print a message with notes when the consume loop starts
Yields: dict – A dictionary containing the topic and value of a single message
-
consume_chunks
(poll_timeout=1.0, overall_timeout=300.0, save_file=True, save_dir='.', passthrough=True, verbose=True)¶ Consume messages from a topic that contains chunked messages. The original file is saved to disk by default.
Parameters: - poll_timeout (float) – Timeout for one message to reach the consumer
- overall_timeout (float) – Time until the consumer will be shutdown if no messages are received
- save_file (bool) – True if the chunked file should be saved. False will return the original data contained in the file
- save_dir (str) – Directory to save files
- passthrough (bool) – If multiple topics are subscribed to and one of them is not using chunking, passthrough=True will ensure those messages are still yielded by the generator
- verbose (bool) – Print details regarding the consumer on start
Yields: - tuple – A tuple containing (timestamp, data) where timestamp is the time the first chunk of the message was sent and where data is either a filepath (save_file=True) or the bytes of the file that was chunked and streamed (save_file=False).
- dict – If passthrough=True is used and a message from a topic without chunking is received, a dictionary containing the topic and value of the message will be yielded. Otherwise, a tuple is returned
Schema-less Producer & Consumer classes¶
-
class
mdml_client.
kafka_mdml_producer_schemaless
(topic, config=None, kafka_host='merf.egs.anl.gov', kafka_port=9092)¶ Creates a schemaless Producer instance for interacting with the MDML.
Parameters: - topic (str) – Topic to send under
- config (dict) – Confluent Kafka client config
- kafka_host (str) – Host name of the kafka broker
- kafka_port (int) – Port used for the Kafka broker
-
flush
()¶ Flush (send) any messages currently waiting in the producer.
-
produce
(data, key=None, partition=None)¶ Produce data to the supplied topic
Parameters: - data (dict) – Dictionary of the data
- key (string) – Key of the message (used in determining a partition) - not required
- partition (int) – Partition used to save the message - not required
-
class
mdml_client.
kafka_mdml_consumer_schemaless
(topics, group, kafka_host='merf.egs.anl.gov', kafka_port=9092)¶ Creates a serializingProducer instance for interacting with the MDML.
Parameters: - topics (list(str)) – Topics to consume from
- group (str) – Consumer group ID. Messages are only consumed by a given group ID once.
- kafka_host (str) – Host name of the kafka broker
- kafka_port (int) – Port used for the kafka broker
-
close
()¶ Closes down the consumer. Ensures that received messages have been acknowledged by Kafka.
-
consume
(poll_timeout=1.0, overall_timeout=300.0, verbose=True)¶ Yields: dict – A dictionary containing the topic and value of a single message
Experiment & Replay Service Functions¶
-
mdml_client.
start_experiment
(id, topics, producer_kwargs={})¶ Start an experiment with the MDML Experiment service. Messages produced on all of the specified topics will be saved to a file and upload to S3.
Parameters: - id (str) – Unique ID for the experiment
- topics (list(str)) – Topics to consume from that make up the experiment
- producer_kwargs (dict) – Dictionary that is passed as kwargs to the underlying producer in this function. Parameter names should be the same as those in a kafka_mdml_producer.
-
mdml_client.
stop_experiment
(id, producer_kwargs={})¶ Stop a previously started experiment. Upon stopping, the experiment service will package all data streamed during an experiment, verify all data is present, and write a file to S3.
Parameters: - id (str) – Unique ID for the experiment
- producer_kwargs (dict) – Dictionary that is passed as kwargs to the underlying producer in this function. Parameter names should be the same as those in a kafka_mdml_producer.
-
mdml_client.
replay_experiment
(experiment_id, speed=1, producer_kwargs={})¶ Replay an experiment - stream data back down their original topics
Parameters: - experiment_id (str) – Unique ID of the experiment to replay
- speed (int) – Speed multiplier used during the replay
- producer_kwargs (dict) – Dictionary of kwargs for this functions internal producer
MDML S3 Client¶
This is used for “coat-checking” large files.
-
class
mdml_client.
kafka_mdml_s3_client
(topic, s3_endpoint=None, s3_access_key=None, s3_secret_key=None, kafka_host='merf.egs.anl.gov', kafka_port=9092, schema_host='merf.egs.anl.gov', schema_port=8081, schema=None)¶ Creates an MDML producer for sending >1MB files to an s3 location. Simultaneously, the MDML sends upload information along a Kafka topic to be received by a client that can retrieve the file.
Parameters: - topic (str) – Topic to send under
- s3_endpoint (str) – Host of the S3 service
- s3_access_key (str) – S3 access key
- s3_secret_key (str) – S3 secret key
- kafka_host (str) – Host name of the kafka broker
- kafka_port (int) – Port used for the kafka broker
- schema_host (str) – Host name of the kafka schema registry
- schema_port (int) – Port of the kafka schema registry
- schema (dict or str) – Schema of the messages sent on the supplied topic. Default schema sends a dictionary containing the time of upload and the location for retrieval. If dict, value is used as the schema. If string, value is used as a file path to a json file.
-
consume
(bucket, object_name, save_filepath)¶ Gets a file from an S3 bucket. Can return the bytes of the file or save the file to a specified path.
Parameters: - bucket (str) – Name of the bucket the object is saved in
- object_name (str) – Name/key of the object to retrieve from the bucket
- save_filepath (str) – Path in which to save the downloaded file. Using a value of None will return the bytes of the file instead of saving to a file
-
produce
(filepath, obj_name, payload=None)¶ Produce data to supplied S3 endpoint and Kafka topic
Parameters: - filepath (str) – Path of the file to upload to the S3 bucket
- obj_name (str) – Name to store the file under
- payload (dict) – Payload for the message sent on the Kafka topic. Only used when the default schema has been overridden.
Helper Functions¶
-
mdml_client.
create_schema
(d, title, descr, required_keys=None, add_time=False)¶ Create a schema for use in a kafka_mdml_producer object. An example of the data object that will be produced is needed to create the schema.
Parameters: - d (dict) – Data object to translate into a schema
- title (str) – Title of the schema
- descr (str) – Description of the schema
- required_keys (list(str)) – List of strings of the keys that are required in the schema
Returns: Return type: Schema dictionary compatible with kafka_mdml_producer
-
mdml_client.
chunk_file
(fn, chunk_size, use_b64=True, encoding='utf-8', file_id=None)¶ Chunks a file into parts. Yields dictionaries containing the file bytes encoded in base64. Base64 is used since the kafka Producer requires a string and some files must be opened in byte format.
Parameters: - fn (str) – Path to the file
- chunk_size (int) – Size of chunk to use
- use_b64 (bool) – True to return the file bytes as a base64 encoded string
- encoding (string) – Encoding to use to open the file if use_b64 is False
- file_id (string) – File ID to use in the chunking process if the fn param is not suitable
Yields: - Dictionary containing a chunk of data and metadata information
- required to piece all of the chunks back together.