Skip to article frontmatterSkip to article content
# imports

import altair as alt
import datetime
import ipywidgets as widgets
import json
import numpy as np
import pandas as pd
import wordfreq as wf

from collections import defaultdict
from functools import cache

print("Imports loaded successfully")
Imports loaded successfully
a = 2
b = 3

5

class Word:
    def __init__(self, word, _type):
        self.word = word
        self.type = _type
        self.lang = self.eval_lang(self.word)
        self.freq = self.eval_freq(self.word, self.lang)
    
    @staticmethod
    def eval_lang(word):
        if any(letter in word.lower() for letter in "абвгдеёжзийклмнопрстуфхцчшщъыьэюя"):
            return "ru"
        return "en"
    
    @staticmethod
    def eval_freq(word, language):
        return wf.word_frequency(word, language)

    def __str__(self):
        return self.word


class Message:
    characters_to_remove = r'!@#$%^&*()_+=/?\|][{}<>;"~`—–«»'
    border_characters = "'-:"
    replacement_pairs = [
        ("\\", " "),
        ("/", " "),
        ("|", " "),
        ("…", "..."),
        (".", " "),
        (",", " "),
        ("”", '"'),
        ("“", '"'),
        ("’", "'"),
        ("‘", "'"),
        ("»", '"'),
        ("«", '"'),
    ]

    def __init__(self, id, unixtime, text_entities):
        self.id = id
        self.time = datetime.datetime.fromtimestamp(int(unixtime))
        self.strings = []
        self.words = []

        self.__process_text(text_entities)
    
    def __process_text(self, text_entities):
        def filtered(string):
            string = string.lower()

            for pair in Message.replacement_pairs:
                string = string.replace(*pair)

            for char in Message.characters_to_remove:
                string = string.replace(char, '')

            return string
        
        def filtered2(string):
            while any(string.startswith(char) for char in Message.border_characters):
                string = string[1:]

            while any(string.endswith(char) for char in Message.border_characters):
                string = string[:-1]
            
            return string
        
        for entity in text_entities:
            for string in entity["text"].split():
                self.strings.append(Word(string, entity["type"]))
            
            for word in filtered(entity["text"]).split():
                word = filtered2(word)

                if word == "":
                    continue

                self.words.append(Word(word, entity["type"]))

    def __str__(self):
        return f"[{self.time}] {' '.join(self.strings)}\n{' '.join(self.words)}"


class Call:
    def __init__(self):
        pass


class User:
    def __init__(self, id, name):
        self.id = id
        self.name = name
        self.messages = []
        self.calls = []
    
    def _add_message(self, json_object):
        if "text_entities" not in json_object:
            json_object["text_entities"] = self.to_json(json_object["text"])
        
        self.messages.append(Message(json_object["id"], json_object["date_unixtime"], json_object["text_entities"]))
    
    def _add_call(self, json_object):
        self.calls = Call()

    @staticmethod
    def to_json(string):
        return [{"text": string, "type": "plain"}]


class Chat:
    def __init__(self, *uploaders, user_class=User):
        self.id = None
        self.me = None
        self.you = None

        for uploader in uploaders:
            if len(uploader.value) != 0:
                data = self.__load_data(uploader)
                self.__process_messages(user_class, data)

    def __load_data(self, uploader):
        raw_data = json.loads(uploader.value[list(uploader.value.keys())[0]]["content"])
        #raw_data = json.loads(uploader.value[0].content.tobytes())

        if "type" in raw_data and raw_data["type"] != "personal_chat":
            raise FileNotFoundError("File uploaded is not from a personal_chat")
        
        if self.id == None:
            self.id = str(raw_data["id"])

        return raw_data
    
    def __process_messages(self, user_class, raw_data):
        def user(id):
            if id.endswith(self.id):
                return self.you
            return self.me
        
        if raw_data == None:
            raise FileNotFoundError("No data loaded")

        for json_object in raw_data["messages"]:
            if "type" not in json_object or json_object["type"] == "message":
                id = json_object["from_id"]

                if self.me == None or self.you == None:
                    if id.endswith(self.id):
                        self.you = user_class(self.id, json_object["from"])
                    else:
                        self.me = user_class(id[4:], json_object["from"])
                
                user(id)._add_message(json_object)
            elif json_object["type"] == "service":
                pass
            else:
                print("[WARNING] unknown object found")
                display(json_object)


class module_WordCounts:
    @cache
    def __word_counts(self, attribute):        
        dictionary = defaultdict(lambda: [0])

        for message in self.messages:
            for word in getattr(message, attribute): 
                dictionary[word.word][0] += 1

        df = pd.DataFrame(dictionary).transpose().reset_index()
        df.columns = ["word", "count"]
        return df

    @cache
    def word_counts_sorted(self, cleaned=True):
        return self.word_counts_alpha(cleaned).sort_values("count", ascending=False)

    @cache
    def word_counts_alpha(self, cleaned=True):
        attribute = "words" 
        if not cleaned:
            attribute = "strings"
            
        return self.__word_counts(attribute).sort_index()
    
    @cache
    def unique_words(self, cleaned=True):
        return self.word_counts_alpha(cleaned).size
    
    @cache
    def total_words(self, cleaned=True):
        return int(self.word_counts_alpha(cleaned)["count"].sum())
    
    @cache 
    def word_frequencies(self):
        pass



class module_TimeSeries:
    @cache
    def timeseries(self):
        dictionary = defaultdict(lambda: [0, None])

        for message in self.messages:
            for word in message.words:
                index = (self.name, message.id, word.word, word.lang, message.time)
                dictionary[index][0] += 1
                if dictionary[index][1] == None:
                    dictionary[index][1] = word.freq
        
        df = pd.DataFrame.from_dict(dictionary, orient="index", columns=["count", "freq"])
        df.index = pd.MultiIndex.from_tuples(df.index, names=["name", "id", "word", "lang", "time"])
        return df.reset_index()
    
    @cache
    def timebins(self, timebin, func=np.sum, exclude=["id"]):
        cols = [col for col in ["name", "id", "word", "lang", "freq"] if col not in exclude]
        df = self.timeseries().drop(columns=exclude).groupby([pd.Grouper(key="time", freq=timebin), *cols]).apply(func).reset_index()
        df["rel_freq"] = df["count"] / df.groupby("time")["count"].transform(sum)
        df["overrep"] = df["rel_freq"] / df["freq"]
        return df


class AllModules(User, module_WordCounts, module_TimeSeries):
    pass
telegram_uploader = widgets.FileUpload(
    accept='.json',
    multiple=False
)

display(telegram_uploader)
Loading...
vk_uploader = widgets.FileUpload(
    accept='.json',
    multiple=False
)

display(vk_uploader)
Loading...
user_choice = widgets.ToggleButtons(
    options=["mine", "not mine"],
    description="Whose chat to analyze?",
    disabled=False,
    tooltips=["Your chat", "Other person's chat"],
)

display(user_choice)
Loading...
chat = Chat(telegram_uploader, vk_uploader, user_class=AllModules)
display(chat.me.word_counts_sorted())
Loading...
if chat.me != None and chat.you != None:
    person = chat.me
    if user_choice.value != "mine":
        person = chat.you

    display(person.timeseries())
if chat.me != None and chat.you != None:
    tb = person.timebins("5Y")
    tb= tb.sort_values("overrep", ascending=False)
    display(tb)
tb.to_excel("test.xlsx")
source = pd.concat([chat.me.timebins("1D"), chat.you.timebins("1D")]).reset_index(drop=True)
display(type(source))
display(source)

alt.data_transformers.disable_max_rows()
Loading...
interval = alt.selection_interval(encodings=["x"])
word_selection = alt.selection_point(
    fields=['word'],
    value='я',
    empty=False,
    bind=alt.binding(
        input='search',
        placeholder="beloved",
        name='Word selection: ',
    )
)

color_scale = alt.Scale(domain=[chat.me.name, chat.you.name], range=["#8a1fb4", "#77e3af"])

#columns = source[["time", "name", "word", "lang", "count"]]

chart_base = alt.Chart(source).mark_bar().encode(
    x=alt.X("time:T").title("Date").axis(None),
    y=alt.Y("total_count:Q").impute(value=0).title("Word Count"),
    color=alt.Color("name:N").scale(color_scale).legend(None),
    opacity=alt.value(0.75),
    tooltip=["time:T", "name:N", "total_count:Q"]
).add_params(word_selection).transform_filter(word_selection).transform_aggregate(
    total_count="sum(count)",
    groupby=["time", "name"]
)

rolling_base = alt.Chart(source).mark_area(color="green", line={"color": "green", "opacity": 0.7}).encode(
    x=alt.X('time:T').title("Date").axis(None),
    y=alt.Y('rolling_mean:Q').impute(value=0),
    color=alt.Color("name:N"),
    opacity=alt.value(0.75)
).add_params(word_selection).transform_filter(word_selection).transform_aggregate(
    total_count="sum(count)",
    groupby=["time", "name"]
).transform_window(
    rolling_mean='mean(total_count)',
    frame=[-14, 14]
)

middle = chart_base.encode(
    x=alt.X('time:T'),
).mark_text().properties(height=1)

bases = [chart_base, rolling_base, middle]

chart, rolling_mean, middle = [base.encode(
    x=alt.X('time:T', scale=alt.Scale(domain=interval.to_dict())).title("Date").axis(None)
).properties(width=1000, height=200) for base in bases]
chart_view, rolling_mean_view, middle_view = [base.add_params(interval).properties(width=1000, height=50) for base in bases]

display((chart.transform_filter(alt.datum.name == chat.me.name) + rolling_mean.transform_filter(alt.datum.name == chat.me.name))
      & middle.encode(x=alt.X('time:T', scale=alt.Scale(domain=interval.to_dict())).title(None),
                      y=alt.Y("total_count:Q").title(None).axis(None)).properties(height=1)
      & (chart.transform_filter(alt.datum.name == chat.you.name).encode(
          x=alt.X("time:T").title("Date").axis(None),
          y=alt.Y("total_count:Q").impute(value=0).title("Word Count").sort('descending')
      ) + rolling_mean.transform_filter(alt.datum.name == chat.you.name))
       & middle_view.encode(y=alt.Y("total_count:Q").title(None).axis(None)).properties(height=20))
Loading...
import pyvista as pv

class MyCustomRoutine:  # noqa: D101
    def __init__(self, mesh):
        self.output = mesh  # Expected PyVista mesh type
        # default parameters
        self.kwargs = {
            'center': (0, 0, 0),
        }

        self.center = {"x": 0,
                       "y": 0,
                       "z": 0}

    def __call__(self, param, value):
        if param in ["x", "y", "z"]:
            self.center[param] = value
            self.kwargs["center"] = tuple(self.center.values())
            self.update()
        else:
            self.kwargs[param] = value
            self.update()

    def update(self):
        # This is where you call your simulation
        result = pv.Sphere(**self.kwargs)
        self.output.copy_from(result)
starting_mesh = pv.Sphere()
engine = MyCustomRoutine(starting_mesh)
p = pv.Plotter()
p.add_mesh(starting_mesh, show_edges=True)
p.add_slider_widget(
    callback=lambda value: engine('x', float(value)),
    rng=[-5, 5],
    value=0,
    title='x',
    pointa=(0.025, 0.1),
    pointb=(0.31, 0.1),
    style='modern',
    interaction_event='always'
)
p.add_slider_widget(
    callback=lambda value: engine('y', float(value)),
    rng=[-5, 5],
    value=0,
    title='y',
    pointa=(0.35, 0.1),
    pointb=(0.64, 0.1),
    style='modern',
    interaction_event='always'
)
p.add_slider_widget(
    callback=lambda value: engine('z', float(value)),
    rng=[-5, 5],
    value=0,
    title='z',
    pointa=(0.67, 0.1),
    pointb=(0.98, 0.1),
    style='modern',
    interaction_event='always'
)
p.show()