有効数字で浮動小数点数を丸める
浮動小数点数の時系列データを圧縮するのに xor encoding という手法があるそうだ。
- Gorilla の論文 http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
- 時系列データベースに関する基礎知識と時系列データの符号化方式について - クックパッド開発者ブログ
値が時系列に 15.5, 14.0625, 3.25, 8.625 というように並んでいたとする。
それぞれを倍精度浮動小数点で表すと、ビット列は下のようになる。
decimail | double precision |
---|---|
15.5 | 01000000 00101111 00000000 00000000 00000000 00000000 00000000 00000000 |
14.0625 | 01000000 00101100 00100000 00000000 00000000 00000000 00000000 00000000 |
3.25 | 01000000 00001010 00000000 00000000 00000000 00000000 00000000 00000000 |
8.625 | 01000000 00100001 01000000 00000000 00000000 00000000 00000000 00000000 |
時系列データというのは、同じ値が続いたり、前後の値が近いという特徴がある。そのとき、近い数字はビット列の並びも近いとすると、前の値ビット列と XOR を取ると、ビットが立っているところが集中するようになる。
00000000 00000011 00100000 00000000 00000000 00000000 00000000 00000000
00000000 00100110 00100000 00000000 00000000 00000000 00000000 00000000
00000000 00101011 01000000 00000000 00000000 00000000 00000000 00000000
ビットが立っているところのデータだけ記録すれば、保存に必要なディスク容量が少なくなるだろうというアイデアです。ビットが立っているところだけ記録する方法は Gorilla の論文を参照してください。
しかし、この手法は本当にいいのだろうか?
浮動小数点数のバイナリデータをダラダラと吐き出すシステムもないわけではないのだろうが、ほとんどの場合10進数のテキストデータが渡されるのではなかろうか。10進数を浮動小数点数に厳密に変換することはできません。例えば、 3.14 を倍精度浮動小数点にしたときのビット列は
01000000 00001001 00011110 10111000 01010001 11101011 10000101 00011111
となります。これを10進数に戻すと 3.14000000000000012434… となります。
3.13 を倍精度浮動小数点にしたら、
01000000 00001001 00001010 00111101 01110000 10100011 11010111 00001010
となるのですが、3.14 と XOR をとったら
00000000 00000000 00010100 10000101 00100001 01000000 01010010 00010101
です。2進数でキリが悪い数字となっているので、値は近いにも関わらず、ビット列はそこまで一致していません。
そもそも、3.14, 3.13 は有効数字は3桁であり、倍精度浮動小数点は10進数で約16桁の精度なので、10進数をそのまま浮動小数点数に直すと無効な桁も表現してしまっていることになります。有効数字だけ残して、無効な桁を表現するビットを丸めてしまえば、もっと効果的にデータを圧縮できそうです。
3.14 の場合は、下のように有効数字を示す右側のビットを落としてしまってよいでしょう。
丸めなし 01000000 00001001 00011110 10111000 01010001 11101011 10000101 00011111
丸めあり 01000000 00001001 00100000 00000000 00000000 00000000 00000000 00000000
有効数字3桁で丸めた浮動小数点を10進数に戻すと、 3.140625 となります。これが2進数ではキリのよい数ということです。記事の最後にこのような丸めを実現するプログラム例を載せておきます。
テキストの 10進数 → 有効数字で丸めた浮動小数点 → xor encoding
という風に変換していけば有効数字を維持したまま時系列データを圧縮できるのではないかと考えました。
実際にやってみました。Gorilla の論文に載っている符号化は実装が面倒すぎたので、代わりに前の値との xor 値を並べたバイト列を圧縮してどの程度効果があるのか試した。
sar でとった 1秒おきの PC の CPU 使用率のデータで試してみたら、以下のような結果になりました。
サイズ [bytes] | |
---|---|
もとのテキストデータ | 70062 |
倍精度浮動小数点数に変換後 XOR 値をとって gzip 圧縮 | 29357 |
有効数字で丸めた浮動小数点数に変換後 XOR 値をとって gzip 圧縮 | 26887 |
一応、効果はあるようです。しかし、普通にテキストファイルを gzip 圧縮したら 18431 bytes となって、最も有効だった。PC の CPU の使用率は変動が激しすぎて、そもそも xor encoding が有効でないのだろう。滑らかな変動をするようなデータで試した方が良かったのかもしれない。
import argparse import math import re import struct import sys class DoublePrecisionFloat: def __init__(self): self.bits = None @staticmethod def from_float(f): instance = DoublePrecisionFloat() instance.bits = struct.unpack('>Q', struct.pack('>d', f))[0] return instance @staticmethod def from_string(string): f = float(string) return DoublePrecisionFloat.from_float(f) @staticmethod def from_bits(bits): instance = DoublePrecisionFloat() instance.bits = bits return instance @property def raw_float(self): return struct.unpack('>d', self.bits.to_bytes(8, byteorder='big'))[0] def to_binstr(self): bb = self.bits.to_bytes(8, byteorder='big') return ' '.join(['{:08b}'.format(b) for b in bb]) def to_hexstr(self): bb = self.bits.to_bytes(8, byteorder='big') hex2bytes = ['{:02x}{:02x}'.format(bb[i], bb[i + 1]) for i in range(0, len(bb), 2)] return ' '.join(hex2bytes) @staticmethod def __significant_bits(significant_figures, base): return int(math.ceil( significant_figures * math.log2(base))) # 11111111 11110000 00000000 ... 00000000 # |<----------><------ fraction ------> # | exponent # sign __FILTER = int.from_bytes( b'\xFF\xF0\x00\x00\x00\x00\x00\x00', byteorder='big', signed=True) def round_off(self, significant_figures, base=10): # 0 0.1 1 # |<-------------------|| # round off sig_bits = self.__significant_bits(significant_figures, base) # left side is padded with 1 filt = self.__FILTER >> sig_bits new_bits = self.bits & filt return DoublePrecisionFloat.from_bits(new_bits) def round_up(self, significant_figures, base=10): # 0 0.1 1 # ||------------------->| # round up sig_bits = self.__significant_bits(significant_figures, base) # left side is padded with 1 filt = self.__FILTER >> sig_bits # TODO: the case of overflow new_bits = (self.bits + (~filt)) & filt return DoublePrecisionFloat.from_bits(new_bits) def round(self, significant_figures, base=10): # 0 0.1 1 # |<--------||--------->| # round off round up sig_bits = self.__significant_bits(significant_figures, base) filt = self.__FILTER >> sig_bits round_up_bit = 0x0010000000000000 >> (sig_bits + 1) new_bits = (self.bits + round_up_bit) & filt return DoublePrecisionFloat.from_bits(new_bits) def xor(self, other): return DoublePrecisionFloat.from_bits(self.bits ^ other.bits) class FloatString: def __init__(self, string): self.string = string @property def significant_figures(self): # TODO: This code is ugly string = self.string # 0.0 f = float(string) if f == 0.0: return 0 # 0.0123 pattern1 = r'[+-]?0\.0*(\d+)' match1 = re.search(pattern1, string) if match1: return len(match1.group(1)) # 123.4 or 123.40 pattern2 = r'[+-]?0*([1-9]+\d*\.\d+)' match2 = re.search(pattern2, string) if match2: return len(match2.group(1)) - 1 # +1E+10 or -1e-10 pattern3 = r'[+-]?(\d+)[eE][+-]\d+' match3 = re.search(pattern3, string) if match3: return len(match3.group(1)) # +1.23E+10 or -1.23e-10 pattern4 = r'[+-]?0*(\d\.\d+)[eE][+-]\d+' match4 = re.search(pattern4, string) if match4: return len(match4.group(1)) - 1 # 1234 pattern5 = r'[+-]?0*(\d+)' match5 = re.search(pattern5, string) if match5: return len(match5.group(1)) raise ValueError("could not get significant figures: '%s'" % string) def to_rounded_float(self): d = DoublePrecisionFloat.from_string(self.string) sigfig = self.significant_figures return d.round(sigfig) def to_nonrounded_float(self): d = DoublePrecisionFloat.from_string(self.string) return d def main(): parser = argparse.ArgumentParser() out_formats = parser.add_mutually_exclusive_group() out_formats.add_argument('-b', action='store_true', help='print results with binary strings') out_formats.add_argument('-x', action='store_true', help='print results with hex strings') out_formats.add_argument('-f', action='store_true', help='print results with decimal number') out_formats.add_argument('-n', action='store_true', help='not round results') args = parser.parse_args() if args.b is True: write_func = lambda f: print(f.to_binstr()) elif args.x is True: write_func = lambda f: print(f.to_hexstr()) elif args.f is True: write_func = lambda f: print(str(f.raw_float)) else: write_func = lambda f: sys.stdout.buffer.write( struct.pack('>d', f.raw_float)) def stdin(): try: while True: yield input().strip() except EOFError: pass for line in stdin(): if args.n is not True: rounded = FloatString(line).to_rounded_float() else: rounded = FloatString(line).to_nonrounded_float() write_func(rounded) if __name__ == '__main__': main()