# imports
import altair as alt
import datetime
import ipywidgets as widgets
import json
import numpy as np
import pandas as pd
import wordfreq as wf
from collections import defaultdict
from functools import cache
print("Imports loaded successfully")
Imports loaded successfully
a = 2
b = 3
5
class Word:
def __init__(self, word, _type):
self.word = word
self.type = _type
self.lang = self.eval_lang(self.word)
self.freq = self.eval_freq(self.word, self.lang)
@staticmethod
def eval_lang(word):
if any(letter in word.lower() for letter in "абвгдеёжзийклмнопрстуфхцчшщъыьэюя"):
return "ru"
return "en"
@staticmethod
def eval_freq(word, language):
return wf.word_frequency(word, language)
def __str__(self):
return self.word
class Message:
characters_to_remove = r'!@#$%^&*()_+=/?\|][{}<>;"~`—–«»'
border_characters = "'-:"
replacement_pairs = [
("\\", " "),
("/", " "),
("|", " "),
("…", "..."),
(".", " "),
(",", " "),
("”", '"'),
("“", '"'),
("’", "'"),
("‘", "'"),
("»", '"'),
("«", '"'),
]
def __init__(self, id, unixtime, text_entities):
self.id = id
self.time = datetime.datetime.fromtimestamp(int(unixtime))
self.strings = []
self.words = []
self.__process_text(text_entities)
def __process_text(self, text_entities):
def filtered(string):
string = string.lower()
for pair in Message.replacement_pairs:
string = string.replace(*pair)
for char in Message.characters_to_remove:
string = string.replace(char, '')
return string
def filtered2(string):
while any(string.startswith(char) for char in Message.border_characters):
string = string[1:]
while any(string.endswith(char) for char in Message.border_characters):
string = string[:-1]
return string
for entity in text_entities:
for string in entity["text"].split():
self.strings.append(Word(string, entity["type"]))
for word in filtered(entity["text"]).split():
word = filtered2(word)
if word == "":
continue
self.words.append(Word(word, entity["type"]))
def __str__(self):
return f"[{self.time}] {' '.join(self.strings)}\n{' '.join(self.words)}"
class Call:
def __init__(self):
pass
class User:
def __init__(self, id, name):
self.id = id
self.name = name
self.messages = []
self.calls = []
def _add_message(self, json_object):
if "text_entities" not in json_object:
json_object["text_entities"] = self.to_json(json_object["text"])
self.messages.append(Message(json_object["id"], json_object["date_unixtime"], json_object["text_entities"]))
def _add_call(self, json_object):
self.calls = Call()
@staticmethod
def to_json(string):
return [{"text": string, "type": "plain"}]
class Chat:
def __init__(self, *uploaders, user_class=User):
self.id = None
self.me = None
self.you = None
for uploader in uploaders:
if len(uploader.value) != 0:
data = self.__load_data(uploader)
self.__process_messages(user_class, data)
def __load_data(self, uploader):
raw_data = json.loads(uploader.value[list(uploader.value.keys())[0]]["content"])
#raw_data = json.loads(uploader.value[0].content.tobytes())
if "type" in raw_data and raw_data["type"] != "personal_chat":
raise FileNotFoundError("File uploaded is not from a personal_chat")
if self.id == None:
self.id = str(raw_data["id"])
return raw_data
def __process_messages(self, user_class, raw_data):
def user(id):
if id.endswith(self.id):
return self.you
return self.me
if raw_data == None:
raise FileNotFoundError("No data loaded")
for json_object in raw_data["messages"]:
if "type" not in json_object or json_object["type"] == "message":
id = json_object["from_id"]
if self.me == None or self.you == None:
if id.endswith(self.id):
self.you = user_class(self.id, json_object["from"])
else:
self.me = user_class(id[4:], json_object["from"])
user(id)._add_message(json_object)
elif json_object["type"] == "service":
pass
else:
print("[WARNING] unknown object found")
display(json_object)
class module_WordCounts:
@cache
def __word_counts(self, attribute):
dictionary = defaultdict(lambda: [0])
for message in self.messages:
for word in getattr(message, attribute):
dictionary[word.word][0] += 1
df = pd.DataFrame(dictionary).transpose().reset_index()
df.columns = ["word", "count"]
return df
@cache
def word_counts_sorted(self, cleaned=True):
return self.word_counts_alpha(cleaned).sort_values("count", ascending=False)
@cache
def word_counts_alpha(self, cleaned=True):
attribute = "words"
if not cleaned:
attribute = "strings"
return self.__word_counts(attribute).sort_index()
@cache
def unique_words(self, cleaned=True):
return self.word_counts_alpha(cleaned).size
@cache
def total_words(self, cleaned=True):
return int(self.word_counts_alpha(cleaned)["count"].sum())
@cache
def word_frequencies(self):
pass
class module_TimeSeries:
@cache
def timeseries(self):
dictionary = defaultdict(lambda: [0, None])
for message in self.messages:
for word in message.words:
index = (self.name, message.id, word.word, word.lang, message.time)
dictionary[index][0] += 1
if dictionary[index][1] == None:
dictionary[index][1] = word.freq
df = pd.DataFrame.from_dict(dictionary, orient="index", columns=["count", "freq"])
df.index = pd.MultiIndex.from_tuples(df.index, names=["name", "id", "word", "lang", "time"])
return df.reset_index()
@cache
def timebins(self, timebin, func=np.sum, exclude=["id"]):
cols = [col for col in ["name", "id", "word", "lang", "freq"] if col not in exclude]
df = self.timeseries().drop(columns=exclude).groupby([pd.Grouper(key="time", freq=timebin), *cols]).apply(func).reset_index()
df["rel_freq"] = df["count"] / df.groupby("time")["count"].transform(sum)
df["overrep"] = df["rel_freq"] / df["freq"]
return df
class AllModules(User, module_WordCounts, module_TimeSeries):
pass
telegram_uploader = widgets.FileUpload(
accept='.json',
multiple=False
)
display(telegram_uploader)
Loading...
vk_uploader = widgets.FileUpload(
accept='.json',
multiple=False
)
display(vk_uploader)
Loading...
user_choice = widgets.ToggleButtons(
options=["mine", "not mine"],
description="Whose chat to analyze?",
disabled=False,
tooltips=["Your chat", "Other person's chat"],
)
display(user_choice)
Loading...
chat = Chat(telegram_uploader, vk_uploader, user_class=AllModules)
display(chat.me.word_counts_sorted())
Loading...
if chat.me != None and chat.you != None:
person = chat.me
if user_choice.value != "mine":
person = chat.you
display(person.timeseries())
if chat.me != None and chat.you != None:
tb = person.timebins("5Y")
tb= tb.sort_values("overrep", ascending=False)
display(tb)
tb.to_excel("test.xlsx")
source = pd.concat([chat.me.timebins("1D"), chat.you.timebins("1D")]).reset_index(drop=True)
display(type(source))
display(source)
alt.data_transformers.disable_max_rows()
Loading...
interval = alt.selection_interval(encodings=["x"])
word_selection = alt.selection_point(
fields=['word'],
value='я',
empty=False,
bind=alt.binding(
input='search',
placeholder="beloved",
name='Word selection: ',
)
)
color_scale = alt.Scale(domain=[chat.me.name, chat.you.name], range=["#8a1fb4", "#77e3af"])
#columns = source[["time", "name", "word", "lang", "count"]]
chart_base = alt.Chart(source).mark_bar().encode(
x=alt.X("time:T").title("Date").axis(None),
y=alt.Y("total_count:Q").impute(value=0).title("Word Count"),
color=alt.Color("name:N").scale(color_scale).legend(None),
opacity=alt.value(0.75),
tooltip=["time:T", "name:N", "total_count:Q"]
).add_params(word_selection).transform_filter(word_selection).transform_aggregate(
total_count="sum(count)",
groupby=["time", "name"]
)
rolling_base = alt.Chart(source).mark_area(color="green", line={"color": "green", "opacity": 0.7}).encode(
x=alt.X('time:T').title("Date").axis(None),
y=alt.Y('rolling_mean:Q').impute(value=0),
color=alt.Color("name:N"),
opacity=alt.value(0.75)
).add_params(word_selection).transform_filter(word_selection).transform_aggregate(
total_count="sum(count)",
groupby=["time", "name"]
).transform_window(
rolling_mean='mean(total_count)',
frame=[-14, 14]
)
middle = chart_base.encode(
x=alt.X('time:T'),
).mark_text().properties(height=1)
bases = [chart_base, rolling_base, middle]
chart, rolling_mean, middle = [base.encode(
x=alt.X('time:T', scale=alt.Scale(domain=interval.to_dict())).title("Date").axis(None)
).properties(width=1000, height=200) for base in bases]
chart_view, rolling_mean_view, middle_view = [base.add_params(interval).properties(width=1000, height=50) for base in bases]
display((chart.transform_filter(alt.datum.name == chat.me.name) + rolling_mean.transform_filter(alt.datum.name == chat.me.name))
& middle.encode(x=alt.X('time:T', scale=alt.Scale(domain=interval.to_dict())).title(None),
y=alt.Y("total_count:Q").title(None).axis(None)).properties(height=1)
& (chart.transform_filter(alt.datum.name == chat.you.name).encode(
x=alt.X("time:T").title("Date").axis(None),
y=alt.Y("total_count:Q").impute(value=0).title("Word Count").sort('descending')
) + rolling_mean.transform_filter(alt.datum.name == chat.you.name))
& middle_view.encode(y=alt.Y("total_count:Q").title(None).axis(None)).properties(height=20))
Loading...
import pyvista as pv
class MyCustomRoutine: # noqa: D101
def __init__(self, mesh):
self.output = mesh # Expected PyVista mesh type
# default parameters
self.kwargs = {
'center': (0, 0, 0),
}
self.center = {"x": 0,
"y": 0,
"z": 0}
def __call__(self, param, value):
if param in ["x", "y", "z"]:
self.center[param] = value
self.kwargs["center"] = tuple(self.center.values())
self.update()
else:
self.kwargs[param] = value
self.update()
def update(self):
# This is where you call your simulation
result = pv.Sphere(**self.kwargs)
self.output.copy_from(result)
starting_mesh = pv.Sphere()
engine = MyCustomRoutine(starting_mesh)
p = pv.Plotter()
p.add_mesh(starting_mesh, show_edges=True)
p.add_slider_widget(
callback=lambda value: engine('x', float(value)),
rng=[-5, 5],
value=0,
title='x',
pointa=(0.025, 0.1),
pointb=(0.31, 0.1),
style='modern',
interaction_event='always'
)
p.add_slider_widget(
callback=lambda value: engine('y', float(value)),
rng=[-5, 5],
value=0,
title='y',
pointa=(0.35, 0.1),
pointb=(0.64, 0.1),
style='modern',
interaction_event='always'
)
p.add_slider_widget(
callback=lambda value: engine('z', float(value)),
rng=[-5, 5],
value=0,
title='z',
pointa=(0.67, 0.1),
pointb=(0.98, 0.1),
style='modern',
interaction_event='always'
)
p.show()