Source code for coinflip._randtests.complexity

from copy import copy
from dataclasses import dataclass
from math import floor
from math import sqrt
from typing import Dict
from typing import List
from typing import Sequence

import altair as alt
import pandas as pd
from scipy.stats import chisquare
from typing_extensions import Literal

from coinflip._randtests.common.collections import Bins
from coinflip._randtests.common.core import *
from coinflip._randtests.common.result import TestResult
from coinflip._randtests.common.result import make_chisquare_table
from coinflip._randtests.common.result import plot_chi2_dist
from coinflip._randtests.common.result import smartround
from coinflip._randtests.common.testutils import rawblocks
from coinflip._randtests.common.typing import Float
from coinflip._randtests.common.typing import Integer

__all__ = ["linear_complexity"]


# TODO - remove hardcoded degrees of freedom
#      - dynamically calculate these probabilities
df = 6
probabilities = [
    0.010417,
    0.03125,
    0.125,
    0.5,
    0.25,
    0.0625,
    0.020833,
]


@randtest()
def linear_complexity(series, heads, tails, ctx, blocksize=None):
    n = len(series)

    if not blocksize:
        for blocksize in [1000, 2500, 500, 5000]:
            nblocks = n // blocksize
            if 500 <= blocksize <= 5000 and nblocks >= 200:
                break
        else:
            blocksize = max(floor(sqrt(n)), 2)

    nblocks = n // blocksize

    set_task_total(ctx, nblocks + 3)

    failures = check_recommendations(
        ctx,
        {
            "n ≥ 1000000": n >= 1000000,
            "500 ≤ blocksize ≤ 5000": 500 <= blocksize <= 5000,
            "nblocks ≥ 200": nblocks >= 200,
        },
    )

    binary = series.map({heads: 1, tails: 0})

    advance_task(ctx)

    mean_expect = (
        blocksize / 2
        + (9 + (-(1 ** (blocksize + 1)))) / 36
        - (blocksize / 3 + 2 / 9) / 2 ** blocksize
    )
    expected_bincounts = [nblocks * prob for prob in probabilities]

    advance_task(ctx)

    variance_bins = Bins([-3, -2, -1, 0, 1, 2, 3])
    for block_tup in rawblocks(binary, blocksize):
        linear_complexity = berlekamp_massey(block_tup)
        variance = (-1) ** blocksize * (linear_complexity - mean_expect) + 2 / 9
        variance_bins[variance] += 1

        advance_task(ctx)

    statistic, p = chisquare(list(variance_bins.values()), expected_bincounts)

    advance_task(ctx)

    return LinearComplexityTestResult(
        heads,
        tails,
        failures,
        statistic,
        p,
        blocksize,
        mean_expect,
        expected_bincounts,
        variance_bins,
    )


@dataclass
class LinearComplexityTestResult(TestResult):
    blocksize: Integer
    mean_expect: Float
    expected_bincounts: List[Float]
    variance_bins: Dict[Float, Integer]

    def _render(self):
        yield self._pretty_result("chi-square")

        f_mean_expect = smartround(self.mean_expect)

        table = make_chisquare_table(
            "deviations of shortest LSFR per block",
            "variance",
            self.variance_bins.keys(),
            self.expected_bincounts,
            self.variance_bins.values(),
            caption=f"from expected mean {f_mean_expect}",
        )
        yield table

    def plot_variance(self):
        df = pd.DataFrame(
            {
                "variance": self.variance_bins.keys(),
                "expected": self.expected_bincounts,
                "observed": self.variance_bins.values(),
            }
        )
        df = df.melt("variance", var_name="type", value_name="nblocks")

        f_mean_expect = smartround(self.mean_expect)

        chart = (
            alt.Chart(df)
            .mark_bar()
            .encode(
                alt.X("variance:O", title="Variance"),
                alt.Y("nblocks:Q", title="Number of blocks"),
                column=alt.Column(
                    "type:N",
                    title=None,
                ),
            )
            .properties(
                title=f"Variance from shortest LSFR length of {f_mean_expect} per block"
            )
        )

        return chart

    def plot_refdist(self):
        return plot_chi2_dist(self.statistic, len(self.variance_bins))


[docs]def berlekamp_massey(sequence: Sequence[Literal[0, 1]]) -> int: """Finds the shortest LSFR in a sequence""" n = len(sequence) error_locator = [0 for _ in range(n)] error_locator[0] = 1 error_locator_prev = copy(error_locator) min_size = 0 # of the LSFR nloops = -1 # since error_locator_prev and min_size were updated for i, seq_bit in enumerate(sequence): discrepancy = seq_bit seq_window = reversed(sequence[i - min_size : i]) errloc_window = error_locator[1 : min_size + 1] for seq_bit, errloc_bit in zip(seq_window, errloc_window): product = seq_bit & errloc_bit discrepancy = discrepancy ^ product if discrepancy: error_locator_temp = copy(error_locator) recalc_bits = [] for errloc_bit, prev_bit in zip( error_locator[i - nloops : n], error_locator_prev ): bit = errloc_bit ^ prev_bit recalc_bits.append(bit) error_locator[i - nloops : n] = recalc_bits if min_size <= i / 2: min_size = i + 1 - min_size nloops = i error_locator_prev = error_locator_temp return min_size