Adding comments to YAML produced with PyYaml

You probably have some representer for the MyObj class, as by default dumping ( print(yaml.dump(MyObj())) ) with PyYAML will give you:

!!python/object:__main__.MyObj {}

PyYAML can only do one thing with the comments in your desired output: discard them. If you would read that desired output back in, you end up with a dict containing a dict ({'boby': {'age': 34}}, you would not get a MyObj() instance because there is no tag information)

The enhanced version for PyYAML that I developed (ruamel.yaml) can read in YAML with comments, preserve the comments and write comments when dumping. If you read your desired output, the resulting data will look (and act) like a dict containing a dict, but in reality there is more complex data structure that can handle the comments. You can however create that structure when ruamel.yaml asks you to dump an instance of MyObj and if you add the comments at that time, you will get your desired output.

from __future__ import print_function

import sys
import ruamel.yaml
from ruamel.yaml.comments import CommentedMap


class MyObj():
    name = "boby"
    age = 34

    def convert_to_yaml_struct(self):
        x = CommentedMap()
        a = CommentedMap()
        x[data.name] = a
        x.yaml_add_eol_comment('this is the name', 'boby', 11)
        a['age'] = data.age
        a.yaml_add_eol_comment('in years', 'age', 11)
        return x

    @staticmethod
    def yaml_representer(dumper, data, flow_style=False):
        assert isinstance(dumper, ruamel.yaml.RoundTripDumper)
        return dumper.represent_dict(data.convert_to_yaml_struct())


ruamel.yaml.RoundTripDumper.add_representer(MyObj, MyObj.yaml_representer)

ruamel.yaml.round_trip_dump(MyObj(), sys.stdout)

Which prints:

boby:      # this is the name
  age: 34  # in years

There is no need to wait with creating the CommentedMap instances until you want to represent the MyObj instance. I would e.g. make name and age into properties that get/set values from/on the approprate CommentedMap. That way you could more easily add the comments before the yaml_representer static method is called to represent the MyObj instance.


Here is a solution I came up with; it's a bit complex but less complex than ruamel, as it works entirely with the plain PyYAML API, and does not round trip comments (so it would not be an appropriate answer to this other question). It's probably not as robust overall yet, as I have not tested extensively, but it seems good-enough for my use case, which is that I want dicts/mappings to be able to have comments, both for the entire mapping, as well as per-item comments.

I believe that round-tripping comments--in this limited context--would also be possible with a similar approach, but I have not tried it, as it's not currently a use-case I have.

Finally, while this solution does not implement adding per-item comment to items in lists/sequences (as this is not something I need at the moment) it could easily be extended to do so.

First, as in ruamel, we need a sort of CommentedMapping class, which associates comments with each key in a Mapping. There are many possible approaches to this; mine is just one:

from collections.abc import Mapping, MutableMapping

class CommentedMapping(MutableMapping):
    def __init__(self, d, comment=None, comments={}):
        self.mapping = d
        self.comment = comment
        self.comments = comments

    def get_comment(self, *path):
        if not path:
            return self.comment

        # Look the key up in self (recursively) and raise a
        # KeyError or other execption if such a key does not
        # exist in the nested structure
        sub = self.mapping
        for p in path:
            if isinstance(sub, CommentedMapping):
                # Subvert comment copying
                sub = sub.mapping[p]
            else:
                sub = sub[p]

        comment = None
        if len(path) == 1:
            comment = self.comments.get(path[0])
        if comment is None:
            comment = self.comments.get(path)
        return comment

    def __getitem__(self, item):
        val = self.mapping[item]
        if (isinstance(val, (dict, Mapping)) and
                not isinstance(val, CommentedMapping)):
            comment = self.get_comment(item)
            comments = {k[1:]: v for k, v in self.comments.items()
                        if isinstance(k, tuple) and len(k) > 1 and k[0] == item}
            val = self.__class__(val, comment=comment, comments=comments)
        return val

    def __setitem__(self, item, value):
        self.mapping[item] = value

    def __delitem__(self, item):
        del self.mapping[item]
        for k in list(self.comments):
            if k == item or (isinstance(k, tuple) and k and k[0] == item):
                del self.comments[key]

    def __iter__(self):
        return iter(self.mapping)

    def __len__(self):
        return len(self.mapping)

    def __repr__(self):
        return f'{type(self).__name__}({self.mapping}, comment={self.comment!r}, comments={self.comments})'

This class has both a .comment attribute, so that it can carry an overall comment for the mapping, and a .comments attribute containing per-key comments. It also allows adding comments for keys in nested dicts, by specifying the key path as a tuple. E.g. comments={('c', 'd'): 'comment'} allows specifying a comment for the key 'd' in the nested dict at 'c'. When getting items from CommentedMapping, if the item's value is a dict/Mapping, it is also wrapped in a CommentedMapping in such a way that preserves its comments. This is useful for recursive calls into the YAML representer for nested structures.

Next we need to implement a custom YAML Dumper which takes care of the full process of serializing an object to YAML. A Dumper is a complicated class that's composed from four other classes, an Emitter, a Serializer, a Representer, and a Resolver. Of these we only have to implement the first three; Resolvers are more concerned with, e.g. how implict scalars like 1 get resolved to the correct type, as well as determining the default tags for various values. It's not really involved here.

First we implement a resolver. The resolver is responsible for recognizing different Python types, and mapping them to their appropriate nodes in the native YAML data structure/representation graph. Namely, these include scalar nodes, sequence nodes, and mapping nodes. For example, the base Representer class includes a representer for Python dicts which converts them to a MappingNode (each item in the dict in turn consists of a pair of ScalarNodes, one for each key and one for each value).

In order to attach comments to entire mappings, as well as to each key in a mapping, we introduce two new Node types which are not formally part of the YAML specification:

from yaml.node import Node, ScalarNode, MappingNode

class CommentedNode(Node):
    """Dummy base class for all nodes with attached comments."""


class CommentedScalarNode(ScalarNode, CommentedNode):
    def __init__(self, tag, value, start_mark=None, end_mark=None, style=None,
                 comment=None):
        super().__init__(tag, value, start_mark, end_mark, style)
        self.comment = comment


class CommentedMappingNode(MappingNode, CommentedNode):
    def __init__(self, tag, value, start_mark=None, end_mark=None,
                 flow_style=None, comment=None, comments={}):
        super().__init__(tag, value, start_mark, end_mark, flow_style)
        self.comment = comment
        self.comments = comments

We then add a CommentedRepresenter which includes code for representing a CommentedMapping as a CommentedMappingNode. In fact, it just reuses the base class's code for representing a mapping, but converts the returned MappingNode to a CommentedMappingNode. It also converts each key from a ScalarNode to a CommentedscalarNode. We base it on SafeRepresenter here since I don't need serialization of arbitrary Python objects:

from yaml.representer import SafeRepresenter

class CommentedRepresenter(SafeRepresenter):
    def represent_commented_mapping(self, data):
        node = super().represent_dict(data)
        comments = {k: data.get_comment(k) for k in data}
        value = []
        for k, v in node.value:
            if k.value in comments:
                k = CommentedScalarNode(
                        k.tag, k.value,
                        k.start_mark, k.end_mark, k.style,
                        comment=comments[k.value])
            value.append((k, v))

        node = CommentedMappingNode(
            node.tag,
            value,
            flow_style=False,  # commented dicts must be in block style
                               # this could be implemented differently for flow-style
                               # maps, but for my case I only want block-style, and
                               # it makes things much simpler
            comment=data.get_comment(),
            comments=comments
        )
        return node

    yaml_representers = SafeRepresenter.yaml_representers.copy()
    yaml_representers[CommentedMapping] = represent_commented_mapping

Next we need to implement a subclass of Serializer. The serializer is responsible for walking the representation graph of nodes, and for each node output one or more events to the emitter, which is a complicated (and sometimes difficult to follow) state machine, which receives a stream of events and outputs the appropriate YAML markup for each event (e.g. there is a MappingStartEvent which, when received, will output a { if it's a flow-style mapping, and/or add the appropriate level of indentation for subsequent output up to the corresponding MappingEndEvent.

Point being, the new serializer must output events representing comments, so that the emitter can know when it needs to emit a comment. This is handling simply by adding a CommentEvent and emitting them every time a CommentedMappingNode or CommentedScalarNode are encountered in the representation:

from yaml import Event

class CommentEvent(yaml.Event):
    """
    Simple stream event representing a comment to be output to the stream.
    """
    def __init__(self, value, start_mark=None, end_mark=None):
        super().__init__(start_mark, end_mark)
        self.value = value

class CommentedSerializer(Serializer):
    def serialize_node(self, node, parent, index):
        if (node not in self.serialized_nodes and
                isinstance(node, CommentedNode) and
                not (isinstance(node, CommentedMappingNode) and
                     isinstance(parent, CommentedMappingNode))):
            # Emit CommentEvents, but only if the current node is not a
            # CommentedMappingNode nested in another CommentedMappingNode (in
            # which case we would have already emitted its comment via the
            # parent mapping)
            self.emit(CommentEvent(node.comment))

        super().serialize_node(node, parent, index)

Next, the Emitter needs to be subclassed to handle CommentEvents. This is perhaps the trickiest part, since as I wrote the emitter is a bit complex and fragile, and written in such a way that it's difficult to modify the state machine (I am tempted to rewrite it more clearly, but don't have time right now). So I experimented with a number of different solutions.

The key method here is Emitter.emit which processes the event stream, and calls "state" methods which perform some action depending on what state the machine is in, which is in turn affected by what events appear in the stream. An important realization is that the stream processing is suspended in many cases while waiting for more events to come in--this is what the Emitter.need_more_events method is responsible for. In some cases, before the current event can be handled, more events need to come in first. For example, in the case of MappingStartEvent at least 3 more events need to be buffered on the stream: the first key/value pair, and the possible the next key. The Emitter needs to know, before it can begin formatting a map, if there are one or more items in the map, and possibly also the length of the first key/value pair. The number of events required before the current event can be handled are hard-coded in the need_more_events method.

The problem is that this does not account for the now possible presence of CommentEvents on the event stream, which should not impact processing of other events. Therefore the Emitter.need_events method to account for the presence of CommentEvents. E.g. if the current event is MappingStartEvent, and there are 3 subsequent events buffered, if one of those are a CommentEvent we can't count it, so we'll need at a minimum 4 events (in case the next one is one of the expected events in a mapping).

Finally, every time a CommentEvent is encountered on the stream, we forcibly break out of the current event processing loop to handle writing the comment, then pop the CommentEvent off the stream and continue as if nothing happened. This is the end result:

import textwrap
from yaml.emitter import Emitter

class CommentedEmitter(Emitter):
    def need_more_events(self):
        if self.events and isinstance(self.events[0], CommentEvent):
            # If the next event is a comment, always break out of the event
            # handling loop so that we divert it for comment handling
            return True
        return super().need_more_events()

    def need_events(self, count):
        # Hack-y: the minimal number of queued events needed to start
        # a block-level event is hard-coded, and does not account for
        # possible comment events, so here we increase the necessary
        # count for every comment event
        comments = [e for e in self.events if isinstance(e, CommentEvent)]
        return super().need_events(count + min(count, len(comments)))

    def emit(self, event):
        if self.events and isinstance(self.events[0], CommentEvent):
            # Write the comment, then pop it off the event stream and continue
            # as normal
            self.write_comment(self.events[0].value)
            self.events.pop(0)

        super().emit(event)

    def write_comment(self, comment):
        indent = self.indent or 0
        width = self.best_width - indent - 2  # 2 for the comment prefix '# '
        lines = ['# ' + line for line in wrap(comment, width)]

        for line in lines:
            if self.encoding:
                line = line.encode(self.encoding)
            self.write_indent()
            self.stream.write(line)
            self.write_line_break()

I also experimented with different approaches to the implementation of write_comment. The Emitter base class has its own method (write_plain) which can handle writing text to the stream with appropriate indentation and line-wrapping. However, it's not quite flexible enough to handle something like comments, where each line needs to be prefixed with something like '# '. One technique I tried was monkey-patching the write_indent method to handle this case, but in the end it was too ugly. I found that simply using Python's built-in textwrap.wrap was sufficient for my case.

Next, we create the dumper by subclassing the existing SafeDumper but inserting our new classes into the MRO:

from yaml import SafeDumper

class CommentedDumper(CommentedEmitter, CommentedSerializer,
                      CommentedRepresenter, SafeDumper):
    """
    Extension of `yaml.SafeDumper` that supports writing `CommentedMapping`s with
    all comments output as YAML comments.
    """

Here's an example usage:

>>> import yaml
>>> d = CommentedMapping({
...     'a': 1,
...     'b': 2,
...     'c': {'d': 3},
... }, comment='my commented dict', comments={
...     'a': 'a comment',
...     'b': 'b comment',
...     'c': 'long string ' * 44,
...     ('c', 'd'): 'd comment'
... })
>>> print(yaml.dump(d, Dumper=CommentedDumper))
# my commented dict
# a comment
a: 1
# b comment
b: 2
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string
c:
  # d comment
  d: 3

I still haven't tested this solution very extensively, and it likely still contains bugs. I'll update it as I use it more and find corner-cases, etc.