Adding comments to YAML produced with PyYaml
You probably have some representer for the MyObj class, as by default dumping ( print(yaml.dump(MyObj()))
) with PyYAML will give you:
!!python/object:__main__.MyObj {}
PyYAML can only do one thing with the comments in your desired output: discard them. If you would read that desired output back in, you end
up with a dict containing a dict ({'boby': {'age': 34}}
, you would not get a MyObj()
instance because there is no tag information)
The enhanced version for PyYAML that I developed (ruamel.yaml) can read in YAML with comments, preserve the comments and write comments when dumping.
If you read your desired output, the resulting data will look (and act) like a dict containing a dict, but in reality there is more complex data structure that can handle the comments. You can however create that structure when ruamel.yaml asks you to dump an instance of MyObj
and if you add the comments at that time, you will get your desired output.
from __future__ import print_function
import sys
import ruamel.yaml
from ruamel.yaml.comments import CommentedMap
class MyObj():
name = "boby"
age = 34
def convert_to_yaml_struct(self):
x = CommentedMap()
a = CommentedMap()
x[data.name] = a
x.yaml_add_eol_comment('this is the name', 'boby', 11)
a['age'] = data.age
a.yaml_add_eol_comment('in years', 'age', 11)
return x
@staticmethod
def yaml_representer(dumper, data, flow_style=False):
assert isinstance(dumper, ruamel.yaml.RoundTripDumper)
return dumper.represent_dict(data.convert_to_yaml_struct())
ruamel.yaml.RoundTripDumper.add_representer(MyObj, MyObj.yaml_representer)
ruamel.yaml.round_trip_dump(MyObj(), sys.stdout)
Which prints:
boby: # this is the name
age: 34 # in years
There is no need to wait with creating the CommentedMap
instances until you want to represent the MyObj
instance. I would e.g. make name
and age
into properties that get/set values from/on the approprate CommentedMap
. That way you could more easily add the comments before the yaml_representer
static method is called to represent the MyObj
instance.
Here is a solution I came up with; it's a bit complex but less complex than ruamel, as it works entirely with the plain PyYAML API, and does not round trip comments (so it would not be an appropriate answer to this other question). It's probably not as robust overall yet, as I have not tested extensively, but it seems good-enough for my use case, which is that I want dicts/mappings to be able to have comments, both for the entire mapping, as well as per-item comments.
I believe that round-tripping comments--in this limited context--would also be possible with a similar approach, but I have not tried it, as it's not currently a use-case I have.
Finally, while this solution does not implement adding per-item comment to items in lists/sequences (as this is not something I need at the moment) it could easily be extended to do so.
First, as in ruamel, we need a sort of CommentedMapping
class, which associates comments with each key in a Mapping. There are many possible approaches to this; mine is just one:
from collections.abc import Mapping, MutableMapping
class CommentedMapping(MutableMapping):
def __init__(self, d, comment=None, comments={}):
self.mapping = d
self.comment = comment
self.comments = comments
def get_comment(self, *path):
if not path:
return self.comment
# Look the key up in self (recursively) and raise a
# KeyError or other execption if such a key does not
# exist in the nested structure
sub = self.mapping
for p in path:
if isinstance(sub, CommentedMapping):
# Subvert comment copying
sub = sub.mapping[p]
else:
sub = sub[p]
comment = None
if len(path) == 1:
comment = self.comments.get(path[0])
if comment is None:
comment = self.comments.get(path)
return comment
def __getitem__(self, item):
val = self.mapping[item]
if (isinstance(val, (dict, Mapping)) and
not isinstance(val, CommentedMapping)):
comment = self.get_comment(item)
comments = {k[1:]: v for k, v in self.comments.items()
if isinstance(k, tuple) and len(k) > 1 and k[0] == item}
val = self.__class__(val, comment=comment, comments=comments)
return val
def __setitem__(self, item, value):
self.mapping[item] = value
def __delitem__(self, item):
del self.mapping[item]
for k in list(self.comments):
if k == item or (isinstance(k, tuple) and k and k[0] == item):
del self.comments[key]
def __iter__(self):
return iter(self.mapping)
def __len__(self):
return len(self.mapping)
def __repr__(self):
return f'{type(self).__name__}({self.mapping}, comment={self.comment!r}, comments={self.comments})'
This class has both a .comment
attribute, so that it can carry an overall comment for the mapping, and a .comments
attribute containing per-key comments. It also allows adding comments for keys in nested dicts, by specifying the key path as a tuple. E.g. comments={('c', 'd'): 'comment'}
allows specifying a comment for the key 'd'
in the nested dict at 'c'
. When getting items from CommentedMapping
, if the item's value is a dict/Mapping, it is also wrapped in a CommentedMapping
in such a way that preserves its comments. This is useful for recursive calls into the YAML representer for nested structures.
Next we need to implement a custom YAML Dumper which takes care of the full process of serializing an object to YAML. A Dumper is a complicated class that's composed from four other classes, an Emitter
, a Serializer
, a Representer
, and a Resolver
. Of these we only have to implement the first three; Resolver
s are more concerned with, e.g. how implict scalars like 1
get resolved to the correct type, as well as determining the default tags for various values. It's not really involved here.
First we implement a resolver. The resolver is responsible for recognizing different Python types, and mapping them to their appropriate nodes in the native YAML data structure/representation graph. Namely, these include scalar nodes, sequence nodes, and mapping nodes. For example, the base Representer
class includes a representer for Python dict
s which converts them to a MappingNode
(each item in the dict in turn consists of a pair of ScalarNode
s, one for each key and one for each value).
In order to attach comments to entire mappings, as well as to each key in a mapping, we introduce two new Node
types which are not formally part of the YAML specification:
from yaml.node import Node, ScalarNode, MappingNode
class CommentedNode(Node):
"""Dummy base class for all nodes with attached comments."""
class CommentedScalarNode(ScalarNode, CommentedNode):
def __init__(self, tag, value, start_mark=None, end_mark=None, style=None,
comment=None):
super().__init__(tag, value, start_mark, end_mark, style)
self.comment = comment
class CommentedMappingNode(MappingNode, CommentedNode):
def __init__(self, tag, value, start_mark=None, end_mark=None,
flow_style=None, comment=None, comments={}):
super().__init__(tag, value, start_mark, end_mark, flow_style)
self.comment = comment
self.comments = comments
We then add a CommentedRepresenter
which includes code for representing a CommentedMapping
as a CommentedMappingNode
. In fact, it just reuses the base class's code for representing a mapping, but converts the returned MappingNode
to a CommentedMappingNode
. It also converts each key from a ScalarNode
to a CommentedscalarNode
. We base it on SafeRepresenter
here since I don't need serialization of arbitrary Python objects:
from yaml.representer import SafeRepresenter
class CommentedRepresenter(SafeRepresenter):
def represent_commented_mapping(self, data):
node = super().represent_dict(data)
comments = {k: data.get_comment(k) for k in data}
value = []
for k, v in node.value:
if k.value in comments:
k = CommentedScalarNode(
k.tag, k.value,
k.start_mark, k.end_mark, k.style,
comment=comments[k.value])
value.append((k, v))
node = CommentedMappingNode(
node.tag,
value,
flow_style=False, # commented dicts must be in block style
# this could be implemented differently for flow-style
# maps, but for my case I only want block-style, and
# it makes things much simpler
comment=data.get_comment(),
comments=comments
)
return node
yaml_representers = SafeRepresenter.yaml_representers.copy()
yaml_representers[CommentedMapping] = represent_commented_mapping
Next we need to implement a subclass of Serializer
. The serializer is responsible for walking the representation graph of nodes, and for each node output one or more events to the emitter, which is a complicated (and sometimes difficult to follow) state machine, which receives a stream of events and outputs the appropriate YAML markup for each event (e.g. there is a MappingStartEvent
which, when received, will output a {
if it's a flow-style mapping, and/or add the appropriate level of indentation for subsequent output up to the corresponding MappingEndEvent
.
Point being, the new serializer must output events representing comments, so that the emitter can know when it needs to emit a comment. This is handling simply by adding a CommentEvent
and emitting them every time a CommentedMappingNode
or CommentedScalarNode
are encountered in the representation:
from yaml import Event
class CommentEvent(yaml.Event):
"""
Simple stream event representing a comment to be output to the stream.
"""
def __init__(self, value, start_mark=None, end_mark=None):
super().__init__(start_mark, end_mark)
self.value = value
class CommentedSerializer(Serializer):
def serialize_node(self, node, parent, index):
if (node not in self.serialized_nodes and
isinstance(node, CommentedNode) and
not (isinstance(node, CommentedMappingNode) and
isinstance(parent, CommentedMappingNode))):
# Emit CommentEvents, but only if the current node is not a
# CommentedMappingNode nested in another CommentedMappingNode (in
# which case we would have already emitted its comment via the
# parent mapping)
self.emit(CommentEvent(node.comment))
super().serialize_node(node, parent, index)
Next, the Emitter
needs to be subclassed to handle CommentEvent
s. This is perhaps the trickiest part, since as I wrote the emitter is a bit complex and fragile, and written in such a way that it's difficult to modify the state machine (I am tempted to rewrite it more clearly, but don't have time right now). So I experimented with a number of different solutions.
The key method here is Emitter.emit
which processes the event stream, and calls "state" methods which perform some action depending on what state the machine is in, which is in turn affected by what events appear in the stream. An important realization is that the stream processing is suspended in many cases while waiting for more events to come in--this is what the Emitter.need_more_events
method is responsible for. In some cases, before the current event can be handled, more events need to come in first. For example, in the case of MappingStartEvent
at least 3 more events need to be buffered on the stream: the first key/value pair, and the possible the next key. The Emitter needs to know, before it can begin formatting a map, if there are one or more items in the map, and possibly also the length of the first key/value pair. The number of events required before the current event can be handled are hard-coded in the need_more_events
method.
The problem is that this does not account for the now possible presence of CommentEvent
s on the event stream, which should not impact processing of other events. Therefore the Emitter.need_events
method to account for the presence of CommentEvent
s. E.g. if the current event is MappingStartEvent
, and there are 3 subsequent events buffered, if one of those are a CommentEvent
we can't count it, so we'll need at a minimum 4 events (in case the next one is one of the expected events in a mapping).
Finally, every time a CommentEvent
is encountered on the stream, we forcibly break out of the current event processing loop to handle writing the comment, then pop the CommentEvent
off the stream and continue as if nothing happened. This is the end result:
import textwrap
from yaml.emitter import Emitter
class CommentedEmitter(Emitter):
def need_more_events(self):
if self.events and isinstance(self.events[0], CommentEvent):
# If the next event is a comment, always break out of the event
# handling loop so that we divert it for comment handling
return True
return super().need_more_events()
def need_events(self, count):
# Hack-y: the minimal number of queued events needed to start
# a block-level event is hard-coded, and does not account for
# possible comment events, so here we increase the necessary
# count for every comment event
comments = [e for e in self.events if isinstance(e, CommentEvent)]
return super().need_events(count + min(count, len(comments)))
def emit(self, event):
if self.events and isinstance(self.events[0], CommentEvent):
# Write the comment, then pop it off the event stream and continue
# as normal
self.write_comment(self.events[0].value)
self.events.pop(0)
super().emit(event)
def write_comment(self, comment):
indent = self.indent or 0
width = self.best_width - indent - 2 # 2 for the comment prefix '# '
lines = ['# ' + line for line in wrap(comment, width)]
for line in lines:
if self.encoding:
line = line.encode(self.encoding)
self.write_indent()
self.stream.write(line)
self.write_line_break()
I also experimented with different approaches to the implementation of write_comment
. The Emitter
base class has its own method (write_plain
) which can handle writing text to the stream with appropriate indentation and line-wrapping. However, it's not quite flexible enough to handle something like comments, where each line needs to be prefixed with something like '# '
. One technique I tried was monkey-patching the write_indent
method to handle this case, but in the end it was too ugly. I found that simply using Python's built-in textwrap.wrap
was sufficient for my case.
Next, we create the dumper by subclassing the existing SafeDumper
but inserting our new classes into the MRO:
from yaml import SafeDumper
class CommentedDumper(CommentedEmitter, CommentedSerializer,
CommentedRepresenter, SafeDumper):
"""
Extension of `yaml.SafeDumper` that supports writing `CommentedMapping`s with
all comments output as YAML comments.
"""
Here's an example usage:
>>> import yaml
>>> d = CommentedMapping({
... 'a': 1,
... 'b': 2,
... 'c': {'d': 3},
... }, comment='my commented dict', comments={
... 'a': 'a comment',
... 'b': 'b comment',
... 'c': 'long string ' * 44,
... ('c', 'd'): 'd comment'
... })
>>> print(yaml.dump(d, Dumper=CommentedDumper))
# my commented dict
# a comment
a: 1
# b comment
b: 2
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string
c:
# d comment
d: 3
I still haven't tested this solution very extensively, and it likely still contains bugs. I'll update it as I use it more and find corner-cases, etc.