有効数字で浮動小数点数を丸める

浮動小数点数の時系列データを圧縮するのに xor encoding という手法があるそうだ。

値が時系列に 15.5, 14.0625, 3.25, 8.625 というように並んでいたとする。

それぞれを倍精度浮動小数点で表すと、ビット列は下のようになる。

decimail double precision
15.5 01000000 00101111 00000000 00000000 00000000 00000000 00000000 00000000
14.0625 01000000 00101100 00100000 00000000 00000000 00000000 00000000 00000000
3.25 01000000 00001010 00000000 00000000 00000000 00000000 00000000 00000000
8.625 01000000 00100001 01000000 00000000 00000000 00000000 00000000 00000000

時系列データというのは、同じ値が続いたり、前後の値が近いという特徴がある。そのとき、近い数字はビット列の並びも近いとすると、前の値ビット列と XOR を取ると、ビットが立っているところが集中するようになる。

00000000 00000011 00100000 00000000 00000000 00000000 00000000 00000000
00000000 00100110 00100000 00000000 00000000 00000000 00000000 00000000
00000000 00101011 01000000 00000000 00000000 00000000 00000000 00000000

ビットが立っているところのデータだけ記録すれば、保存に必要なディスク容量が少なくなるだろうというアイデアです。ビットが立っているところだけ記録する方法は Gorilla の論文を参照してください。

しかし、この手法は本当にいいのだろうか?

浮動小数点数のバイナリデータをダラダラと吐き出すシステムもないわけではないのだろうが、ほとんどの場合10進数のテキストデータが渡されるのではなかろうか。10進数を浮動小数点数に厳密に変換することはできません。例えば、 3.14 を倍精度浮動小数点にしたときのビット列は

01000000 00001001 00011110 10111000 01010001 11101011 10000101 00011111

となります。これを10進数に戻すと 3.14000000000000012434… となります。

3.13 を倍精度浮動小数点にしたら、

01000000 00001001 00001010 00111101 01110000 10100011 11010111 00001010

となるのですが、3.14 と XOR をとったら

00000000 00000000 00010100 10000101 00100001 01000000 01010010 00010101

です。2進数でキリが悪い数字となっているので、値は近いにも関わらず、ビット列はそこまで一致していません。

そもそも、3.14, 3.13 は有効数字は3桁であり、倍精度浮動小数点は10進数で約16桁の精度なので、10進数をそのまま浮動小数点数に直すと無効な桁も表現してしまっていることになります。有効数字だけ残して、無効な桁を表現するビットを丸めてしまえば、もっと効果的にデータを圧縮できそうです。

3.14 の場合は、下のように有効数字を示す右側のビットを落としてしまってよいでしょう。

丸めなし 01000000 00001001 00011110 10111000 01010001 11101011 10000101 00011111
丸めあり 01000000 00001001 00100000 00000000 00000000 00000000 00000000 00000000

有効数字3桁で丸めた浮動小数点を10進数に戻すと、 3.140625 となります。これが2進数ではキリのよい数ということです。記事の最後にこのような丸めを実現するプログラム例を載せておきます。

テキストの 10進数 → 有効数字で丸めた浮動小数点 → xor encoding

という風に変換していけば有効数字を維持したまま時系列データを圧縮できるのではないかと考えました。

実際にやってみました。Gorilla の論文に載っている符号化は実装が面倒すぎたので、代わりに前の値との xor 値を並べたバイト列を圧縮してどの程度効果があるのか試した。

sar でとった 1秒おきの PC の CPU 使用率のデータで試してみたら、以下のような結果になりました。

サイズ [bytes]
もとのテキストデータ 70062
倍精度浮動小数点数に変換後 XOR 値をとって gzip 圧縮 29357
有効数字で丸めた浮動小数点数に変換後 XOR 値をとって gzip 圧縮 26887

一応、効果はあるようです。しかし、普通にテキストファイルを gzip 圧縮したら 18431 bytes となって、最も有効だった。PC の CPU の使用率は変動が激しすぎて、そもそも xor encoding が有効でないのだろう。滑らかな変動をするようなデータで試した方が良かったのかもしれない。


import argparse
import math
import re
import struct
import sys


class DoublePrecisionFloat:
    def __init__(self):
        self.bits = None

    @staticmethod
    def from_float(f):
        instance = DoublePrecisionFloat()
        instance.bits = struct.unpack('>Q', struct.pack('>d', f))[0]
        return instance

    @staticmethod
    def from_string(string):
        f = float(string)
        return DoublePrecisionFloat.from_float(f)

    @staticmethod
    def from_bits(bits):
        instance = DoublePrecisionFloat()
        instance.bits = bits
        return instance

    @property
    def raw_float(self):
        return struct.unpack('>d', self.bits.to_bytes(8, byteorder='big'))[0]

    def to_binstr(self):
        bb = self.bits.to_bytes(8, byteorder='big')
        return ' '.join(['{:08b}'.format(b) for b in bb])

    def to_hexstr(self):
        bb = self.bits.to_bytes(8, byteorder='big')
        hex2bytes = ['{:02x}{:02x}'.format(bb[i], bb[i + 1])
                     for i in range(0, len(bb), 2)]
        return ' '.join(hex2bytes)

    @staticmethod
    def __significant_bits(significant_figures, base):
        return int(math.ceil(
            significant_figures * math.log2(base)))

    # 11111111 11110000 00000000 ... 00000000
    # |<----------><------  fraction  ------>
    # |  exponent
    # sign
    __FILTER = int.from_bytes(
        b'\xFF\xF0\x00\x00\x00\x00\x00\x00',
        byteorder='big',
        signed=True)

    def round_off(self, significant_figures, base=10):
        # 0         0.1         1
        # |<-------------------||
        #        round off
        sig_bits = self.__significant_bits(significant_figures, base)
        # left side is padded with 1
        filt = self.__FILTER >> sig_bits
        new_bits = self.bits & filt
        return DoublePrecisionFloat.from_bits(new_bits)

    def round_up(self, significant_figures, base=10):
        # 0         0.1         1
        # ||------------------->|
        #        round up

        sig_bits = self.__significant_bits(significant_figures, base)
        # left side is padded with 1
        filt = self.__FILTER >> sig_bits

        # TODO: the case of overflow
        new_bits = (self.bits + (~filt)) & filt
        return DoublePrecisionFloat.from_bits(new_bits)

    def round(self, significant_figures, base=10):
        # 0         0.1         1
        # |<--------||--------->|
        #  round off   round up

        sig_bits = self.__significant_bits(significant_figures, base)
        filt = self.__FILTER >> sig_bits
        round_up_bit = 0x0010000000000000 >> (sig_bits + 1)
        new_bits = (self.bits + round_up_bit) & filt
        return DoublePrecisionFloat.from_bits(new_bits)

    def xor(self, other):
        return DoublePrecisionFloat.from_bits(self.bits ^ other.bits)


class FloatString:
    def __init__(self, string):
        self.string = string

    @property
    def significant_figures(self):
        # TODO: This code is ugly

        string = self.string

        # 0.0
        f = float(string)
        if f == 0.0:
            return 0

        # 0.0123
        pattern1 = r'[+-]?0\.0*(\d+)'
        match1 = re.search(pattern1, string)
        if match1:
            return len(match1.group(1))

        # 123.4 or 123.40
        pattern2 = r'[+-]?0*([1-9]+\d*\.\d+)'
        match2 = re.search(pattern2, string)
        if match2:
            return len(match2.group(1)) - 1

        # +1E+10 or -1e-10
        pattern3 = r'[+-]?(\d+)[eE][+-]\d+'
        match3 = re.search(pattern3, string)
        if match3:
            return len(match3.group(1))

        # +1.23E+10 or -1.23e-10
        pattern4 = r'[+-]?0*(\d\.\d+)[eE][+-]\d+'
        match4 = re.search(pattern4, string)
        if match4:
            return len(match4.group(1)) - 1

        # 1234
        pattern5 = r'[+-]?0*(\d+)'
        match5 = re.search(pattern5, string)
        if match5:
            return len(match5.group(1))

        raise ValueError("could not get significant figures: '%s'" % string)

    def to_rounded_float(self):
        d = DoublePrecisionFloat.from_string(self.string)
        sigfig = self.significant_figures
        return d.round(sigfig)

    def to_nonrounded_float(self):
        d = DoublePrecisionFloat.from_string(self.string)
        return d


def main():
    parser = argparse.ArgumentParser()
    out_formats = parser.add_mutually_exclusive_group()
    out_formats.add_argument('-b', action='store_true',
                             help='print results with binary strings')
    out_formats.add_argument('-x', action='store_true',
                             help='print results with hex strings')
    out_formats.add_argument('-f', action='store_true',
                             help='print results with decimal number')
    out_formats.add_argument('-n', action='store_true',
                             help='not round results')
    args = parser.parse_args()

    if args.b is True:
        write_func = lambda f: print(f.to_binstr())
    elif args.x is True:
        write_func = lambda f: print(f.to_hexstr())
    elif args.f is True:
        write_func = lambda f: print(str(f.raw_float))
    else:
        write_func = lambda f: sys.stdout.buffer.write(
            struct.pack('>d', f.raw_float))

    def stdin():
        try:
            while True:
                yield input().strip()
        except EOFError:
                pass

    for line in stdin():
        if args.n is not True:
            rounded = FloatString(line).to_rounded_float()
        else:
            rounded = FloatString(line).to_nonrounded_float()
        write_func(rounded)

if __name__ == '__main__':
    main()