...
Die Bildungsinhalte sind vorwiegend dem Bereich Schulbildung zuzuordnen und auf der Neutralitäts-Skala hoch bewertet (4 bis 5). Dies lässt sich durch das redaktionelle einpflegen der Inhalte erklären.
...
Verteilung der Daten
Ein Großteil der Datensätze ist den Disziplinen: Informatik, Chemie, Physik, Mathematik und Darstellendes Spiel zuzuordnen.
...
Fast alle Inhalte wurden auf der Skala mit 4 oder 5 bewertet, was jedoch im Rahmen der Erwartungen liegt, da von Redaktionen gepflegte Inhalte von eher besserer Qualität sind.
...
Analyse der Textqualität
Da die Beschreibungs- und Volltexte die Grundlage der Bewertung bildeten, wurden diese hinsichtlich ihrer Qualität bewertet.
...
Textanalyse der Beschreibungstexte
...
Textanalyse Volltexte
...
Verteilung der Daten
...
...
Fast alle Inhalte wurden auf der Skala mit 4 oder 5 bewertet, was jedoch im Rahmen der Erwartungen liegt, da von Redaktionen gepflegte Inhalte von eher besserer Qualität sind.
...
Testdurchführung
Für die Testdurchführung wurde ein Python-Script entwickelt, das die Beschreibungs- und Volltexte der Bildungsdatensätze von http://WirLernenOnline.de nutzt, um eine KI-basierte Bewertung der Neutralität durchzuführen.
...
Es zeigt sich auch, dass die LLM-Bewertung auf bestimmte Aspekte der Neutralität stärker fokussiert, wie etwa die Vielfalt der Perspektiven und das Risiko für Fehlinterpretationen. Diese Fokusverschiebung könnte zu wertvollen Ergänzungen im Prozess der Neutralitätsbewertung beitragen, sollte jedoch immer im Kontext der redaktionellen Standards interpretiert werden.
Anlage
Quellen für die Promptentwicklung
siehe hier
Tool für die Volltextgenerierung
Code Block |
---|
# URL-basierte Volltext-Anreicherung mit Goose3
# Anforderungen: pip install beautifulsoup4 streamlit requests goose3
# Script als fulltext_enricher.py speichern
# Start: steamlit run fulltext_enricher.py
import streamlit as st
import json
import os
import datetime
import time
from goose3 import Goose
# List of file extensions to skip (e.g. audio and video formats)
SKIPPED_EXTENSIONS = ['.mp4', '.mp3', '.avi', '.mpeg', '.mov', '.wmv', '.flv']
# Function to scrape a webpage using Goose3 and return the extracted information
def scrape_page(url, follow_redirect=True):
goose = Goose() # Initialize Goose3 with default settings
try:
if isinstance(url, list): # Check if URL is in list format and convert to string
url = url[0]
# Skip URLs with certain extensions
if any(url.lower().endswith(ext) for ext in SKIPPED_EXTENSIONS):
return "skipped", None
article = goose.extract(url=url)
# Extract the relevant information
full_text = article.cleaned_text # Cleaned full text
title = article.title # Extracted title
summary = article.meta_description[:500] if article.meta_description else full_text[:500] # Meta description as summary or first 500 chars
keywords = article.meta_keywords # Extracted meta keywords
top_image = article.top_image.src if article.top_image else None # URL of the top image
# Fallback: If full text is missing, use meta description
if not full_text:
full_text = article.meta_description or "No full text available for this page."
return {
'title': title,
'full_text': full_text,
'summary': summary,
'keywords': keywords,
'top_image': top_image,
'url': url
}
except Exception as e:
st.error(f"Error scraping {url}: {e}")
return None
# Function to process the JSON file and enrich it with scraped data
def process_json(data, url_field, timeout, save_folder, follow_redirect=True, skip_media_files=True, crawl_all=True, num_records=10):
try:
total_records = len(data)
records_to_process = total_records if crawl_all else min(num_records, total_records)
st.write(f"Processing {records_to_process} out of {total_records} records...")
enriched_data = []
for i, record in enumerate(data[:records_to_process], 1):
# Display the progress message for processing records
st.info(f"Processing record {i}/{records_to_process}", icon="ℹ️")
# Extract URL using the selected field
try:
url = eval(f"record{url_field}")
# If URL is in list format, convert it to a string (take the first URL)
if isinstance(url, list):
url = url[0]
except Exception as e:
st.warning(f"No valid URL found for record {i}/{records_to_process}. Skipping... (Error: {e})", icon="⚠️")
continue
if url:
# Skip media files if the option is selected
if skip_media_files and any(url.lower().endswith(ext) for ext in SKIPPED_EXTENSIONS):
st.warning(f"Skipping media file URL #{i}: {url}", icon="⚠️")
continue
# Display the message for scraping the current URL
st.success(f"Scraping URL #{i}: {url}", icon="🟢")
scraped_data = scrape_page(url, follow_redirect=follow_redirect)
if scraped_data != "skipped" and scraped_data:
# Only display the summary preview message without additional status info
if scraped_data['summary']:
st.success(scraped_data['summary'][:250] + "..." if len(scraped_data['summary']) > 250 else scraped_data['summary'])
# Add scraped data to the record
record['additional_data'] = {
'title': scraped_data['title'],
'full_text': scraped_data['full_text'],
'summary': scraped_data['summary'],
'keywords': scraped_data['keywords'],
'top_image': scraped_data['top_image'],
'final_url': scraped_data['url']
}
enriched_data.append(record)
# Introduce delay for the given timeout
time.sleep(timeout)
# Save the enriched JSON file
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
enriched_file_name = os.path.join(save_folder, f"data_enriched_{timestamp}.json")
with open(enriched_file_name, 'w', encoding='utf-8') as f:
json.dump(enriched_data, f, ensure_ascii=False, indent=4)
st.success(f"Enriched data saved as {enriched_file_name}", icon="🟢")
except Exception as e:
st.error(f"Error processing JSON file: {e}")
# Function to extract available fields from JSON structure
def extract_fields(data):
field_set = set()
# Recursive function to explore the JSON structure
def recurse_json(obj, parent_key=''):
if isinstance(obj, dict):
for key, value in obj.items():
new_key = f"{parent_key}['{key}']" if parent_key else f"['{key}']"
field_set.add(new_key)
recurse_json(value, new_key)
elif isinstance(obj, list):
for item in obj:
recurse_json(item, parent_key)
recurse_json(data)
return sorted(list(field_set))
# Streamlit UI
st.title('JSON Web Scraper and Enricher using Goose3')
# Upload JSON file
uploaded_file = st.file_uploader("Choose a JSON file", type="json")
if uploaded_file:
try:
# Load the JSON data
data = json.load(uploaded_file)
# Extract all field paths from the JSON structure
available_fields = extract_fields(data)
# Allow the user to choose a URL field
url_field = st.selectbox("Select the URL field", available_fields, index=available_fields.index("['properties']['ccm:wwwurl']"))
# Other options for processing
timeout = st.number_input("Enter delay between requests (seconds)", min_value=0, value=0)
save_folder = st.text_input("Folder to save enriched JSON", value=".")
follow_redirects = st.checkbox("Follow redirects", value=True)
# Option to skip media files (audio/video)
skip_media_files = st.checkbox("Skip media files (e.g., .mp4, .mp3, .avi)", value=True)
# Option to process all records or only a limited number
crawl_all = st.checkbox("Crawl all records", value=True)
num_records = st.number_input("Number of records to process", min_value=1, value=10, disabled=crawl_all)
if st.button("Start Processing"):
process_json(data, url_field, timeout, save_folder, follow_redirects, skip_media_files, crawl_all, num_records)
except Exception as e:
st.error(f"Error loading JSON file: {e}")
|
Tool für die Filterung, Analyse und Bestimmung der Textqualität
Code Block |
---|
# Tool für die Betrachtung, Analyse und Filterung von JSON-Files
# Anforderungen: pip install streamlit pandas orjson matplotlib seaborn plotly textstat nltk langdetect textblob
# Script als datamanager_json.py speichern
# Start: streamlit run datamanager_json.py
import streamlit as st
import pandas as pd
import orjson
import os
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import textstat
import re
import nltk
from nltk import ngrams
from collections import Counter, defaultdict
from langdetect import detect, LangDetectException
from textblob import TextBlob
import time
from itertools import cycle
# Download NLTK data
nltk.download('punkt')
# Funktion zum rekursiven Flachlegen von JSON
def flatten_json(y):
out = {}
def flatten(x, name=''):
if isinstance(x, dict):
for a in x:
flatten(x[a], f'{name}{a}_')
elif isinstance(x, list):
# Join list items with a comma
out[name[:-1]] = ', '.join(map(str, x))
else:
out[name[:-1]] = x
flatten(y)
return out
@st.cache_data(show_spinner=False)
def process_uploaded_file(file_path):
try:
records = []
with open(file_path, 'rb') as f:
content = f.read()
data = orjson.loads(content)
if isinstance(data, list):
for record in data:
flat_record = flatten_json(record)
records.append(flat_record)
elif isinstance(data, dict):
flat_record = flatten_json(data)
records.append(flat_record)
else:
st.error("Nicht unterstützte JSON-Struktur.")
return pd.DataFrame()
df = pd.DataFrame(records)
st.success(f"JSON-Daten erfolgreich in DataFrame konvertiert. Spalten: {len(df.columns)}")
st.write(f"Anzahl der Datensätze: {len(df)}")
return df
except Exception as e:
st.error(f"Fehler beim Verarbeiten der Datei: {e}")
return pd.DataFrame()
def merge_similar_fields(df):
pattern = re.compile(r'^(.*?)(?:_\d+)?$')
base_columns = {}
for col in df.columns:
match = pattern.match(col)
if match:
base_name = match.group(1)
if base_name not in base_columns:
base_columns[base_name] = []
base_columns[base_name].append(col)
for base, cols in base_columns.items():
if len(cols) > 1:
df[base] = df[cols].apply(lambda row: ', '.join(row.dropna().astype(str)), axis=1)
df.drop(columns=cols, inplace=True)
return df
def calculate_fill_status(df):
fill_status = df.notnull().mean() * 100
fill_status = fill_status.sort_values(ascending=False)
return fill_status
def get_all_fields(data, parent_key='', fields=None):
if fields is None:
fields = set()
if isinstance(data, dict):
for key, value in data.items():
full_key = f'{parent_key}.{key}' if parent_key else key
fields.add(full_key)
if isinstance(value, dict):
get_all_fields(value, full_key, fields)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
get_all_fields(item, full_key, fields)
elif isinstance(data, list):
for item in data:
if isinstance(item, dict):
get_all_fields(item, parent_key, fields)
return fields
def load_json(file_path):
with open(file_path, "rb") as f:
return orjson.loads(f.read())
def save_json(data, file_path):
with open(file_path, "wb") as f:
f.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
def list_json_files(directory):
return [file for file in os.listdir(directory) if file.endswith(".json")]
def preview_data(data, index=0):
if 0 <= index < len(data):
return data[index]
return {}
def get_nested_value(data, path):
keys = path.split(".")
for key in keys:
if isinstance(data, list):
# Extrahiere Werte für den Schlüssel aus jedem Dict in der Liste
next_data = []
for item in data:
if isinstance(item, dict) and key in item:
next_data.append(item[key])
data = next_data if next_data else None
elif isinstance(data, dict):
data = data.get(key)
else:
return None
if data is None:
return None
# Flatten the list if it's a list of lists
if isinstance(data, list):
flat_data = []
for item in data:
if isinstance(item, list):
flat_data.extend(item)
else:
flat_data.append(item)
return flat_data
return data
def is_field_empty(value):
"""Prüft, ob ein Feld als 'leer' betrachtet wird (z.B. None, leere Strings, Listen, Dicts)."""
if value is None:
return True
if isinstance(value, str) and value.strip() == "":
return True
if isinstance(value, list) and len(value) == 0:
return True
if isinstance(value, dict) and len(value) == 0:
return True
return False
def remove_fields(data, fields_to_remove):
for item in data:
for field in fields_to_remove:
keys = field.split(".")
current_dict = item
for key in keys[:-1]:
if key in current_dict and isinstance(current_dict[key], dict):
current_dict = current_dict[key]
else:
current_dict = {}
break
if keys[-1] in current_dict:
del current_dict[keys[-1]]
return data
def current_timestamp():
return time.strftime("%Y%m%d_%H%M%S")
def text_analysis(df, text_field, min_chars=0):
try:
texts = df[text_field].dropna().astype(str)
except KeyError:
st.error(f"Feld '{text_field}' existiert nicht.")
return
if min_chars > 0:
texts = texts[texts.str.len() >= min_chars]
if 'text_index' not in st.session_state:
st.session_state.text_index = 1
st.subheader("Texte durchsuchen")
if not texts.empty:
# Verwenden Sie Streamlit's native Funktionen für die Navigation
st.markdown("### Vorschau des Textes")
max_index = len(texts)
st.session_state.text_index = st.number_input(
"Datensatznummer",
min_value=1,
max_value=max_index,
value=st.session_state.text_index if st.session_state.text_index <= max_index else max_index,
step=1,
key='text_navigation'
)
current_text = texts.iloc[st.session_state.text_index - 1]
st.text_area("Text", value=current_text, height=200, key='text_display')
st.write(f"**Datensatz Nummer:** {st.session_state.text_index}")
st.subheader("Textstatistik")
num_chars = texts.str.len().sum()
num_words = texts.apply(lambda x: len(x.split())).sum()
avg_chars = texts.str.len().mean()
avg_words = texts.apply(lambda x: len(x.split())).mean()
st.write(f"**Gesamtanzahl der Zeichen:** {num_chars}")
st.write(f"**Gesamtanzahl der Wörter:** {num_words}")
st.write(f"**Durchschnittliche Zeichen pro Text:** {avg_chars:.2f}")
st.write(f"**Durchschnittliche Wörter pro Text:** {avg_words:.2f}")
# **Lesbarkeitsmetriken**
st.subheader("Lesbarkeitsmetriken")
st.markdown("""
**Flesch Reading Ease:** Bewertet, wie leicht ein Text zu lesen ist. Höhere Werte bedeuten leichter lesbare Texte.
**Flesch-Kincaid Grade:** Gibt das Schulniveau an, das für das Verständnis des Textes erforderlich ist.
**Gunning Fog Index:** Misst die Anzahl der Jahre formaler Bildung, die ein Leser benötigt, um den Text zu verstehen.
**SMOG Index:** Schätzt die erforderlichen Jahre formaler Bildung basierend auf der Anzahl der polysyllabischen Wörter.
""")
readability_df = pd.DataFrame({
'Flesch Reading Ease': texts.apply(textstat.flesch_reading_ease),
'Flesch-Kincaid Grade': texts.apply(textstat.flesch_kincaid_grade),
'Gunning Fog Index': texts.apply(textstat.gunning_fog),
'SMOG Index': texts.apply(textstat.smog_index)
})
readability_summary = readability_df.mean().round(2)
st.write(readability_summary.to_frame(name='Durchschnitt').T)
st.markdown("""
**Interpretation der Lesbarkeitsmetriken:**
- **Flesch Reading Ease:** Werte zwischen 60-70 sind für die meisten Erwachsenen gut verständlich.
- **Flesch-Kincaid Grade:** Ein Wert von 8 bedeutet, dass ein Schüler der 8. Klasse den Text verstehen sollte.
- **Gunning Fog Index:** Ein Wert von 12 entspricht dem Niveau eines Highschool-Abschlusses.
- **SMOG Index:** Gibt die geschätzten Jahre formaler Bildung an, die für das Verständnis des Textes erforderlich sind.
""")
# **Sentiment-Analyse**
st.subheader("Sentiment-Analyse")
st.markdown("""
**Sentiment-Analyse:** Bestimmt die emotionale Tonalität der Texte. Die Kategorien sind:
- **Positiv:** Der Text drückt positive Gefühle aus.
- **Negativ:** Der Text drückt negative Gefühle aus.
- **Neutral:** Der Text drückt weder positive noch negative Gefühle aus.
""")
sentiments = texts.apply(lambda x: TextBlob(x).sentiment.polarity)
sentiment_counts = sentiments.apply(lambda x: 'Positiv' if x > 0 else ('Negativ' if x < 0 else 'Neutral')).value_counts()
sentiment_counts_df = sentiment_counts.reset_index()
sentiment_counts_df.columns = ['Sentiment', 'Anzahl der Texte']
fig2 = px.bar(
sentiment_counts_df,
x='Sentiment',
y='Anzahl der Texte',
labels={'Anzahl der Texte': 'Anzahl der Texte'},
title="Verteilung der Sentiment-Kategorien",
hover_data={'Anzahl der Texte': True}
)
fig2.update_traces(marker_color='blue')
fig2.update_layout(xaxis_title='Sentiment', yaxis_title='Anzahl der Texte')
st.plotly_chart(fig2, use_container_width=True, key='sentiment_plot')
st.markdown("""
**Interpretation der Sentiment-Analyse:**
- **Positiv:** Ein hoher Anteil positiver Texte kann auf eine optimistische Stimmung in den Daten hinweisen.
- **Negativ:** Ein hoher Anteil negativer Texte kann auf Herausforderungen oder Kritik in den Daten hinweisen.
- **Neutral:** Ein hoher Anteil neutraler Texte deutet auf sachliche oder informative Inhalte hin.
""")
# **Spracherkennung**
st.subheader("Spracherkennung")
st.markdown("""
**Spracherkennung:** Identifiziert die Sprache der Texte, um sicherzustellen, dass alle Texte in der erwarteten Sprache verfasst sind.
""")
def detect_language(text):
try:
return detect(text)
except LangDetectException:
return "Unbekannt"
languages = texts.apply(detect_language)
language_counts = languages.value_counts()
language_counts_df = language_counts.reset_index()
language_counts_df.columns = ['Sprache', 'Anzahl der Texte']
fig3 = px.bar(
language_counts_df,
x='Sprache',
y='Anzahl der Texte',
labels={'Anzahl der Texte': 'Anzahl der Texte'},
title="Verteilung der erkannten Sprachen",
hover_data={'Anzahl der Texte': True}
)
fig3.update_traces(marker_color='orange')
fig3.update_layout(xaxis_title='Sprache', yaxis_title='Anzahl der Texte')
st.plotly_chart(fig3, use_container_width=True, key='language_plot')
st.markdown("""
**Interpretation der Spracherkennung:**
- **Sprache:** Die erkannten Sprachen geben Aufschluss darüber, welche Sprachen in den Textdaten vorherrschen.
- **Unbekannt:** Ein hoher Anteil an "Unbekannt" kann auf unklare oder gemischte Sprachinhalte hinweisen.
""")
# **Lexikalische Vielfalt**
st.subheader("Lexikalische Vielfalt")
st.markdown("""
**Lexikalische Vielfalt (Typ-Token-Verhältnis, TTR):** Misst die Vielfalt des verwendeten Wortschatzes. Ein höheres TTR weist auf eine größere Wortvielfalt hin, was auf eine reichhaltigere und vielfältigere Sprache im Text hindeutet.
""")
def type_token_ratio(text):
tokens = text.split()
types = set(tokens)
return len(types) / len(tokens) if len(tokens) > 0 else 0
df['TTR'] = texts.apply(type_token_ratio)
ttr_summary = df['TTR'].describe()
st.write("**Statistik des Typ-Token-Verhältnisses (TTR):**")
st.write(ttr_summary)
fig5 = px.histogram(
df,
x='TTR',
nbins=20,
title="Verteilung des Typ-Token-Verhältnisses (TTR)",
labels={'TTR': 'TTR', 'count': 'Anzahl der Texte'},
opacity=0.75
# histnorm='count' entfernt, da 'count' ungültig ist und 'count' das Standardverhalten ist
)
fig5.update_traces(marker_color='green')
st.plotly_chart(fig5, use_container_width=True, key='ttr_plot')
st.markdown("""
**Interpretation der lexikalischen Vielfalt:**
- **Höheres TTR:** Größere Vielfalt im Wortschatz, was auf abwechslungsreichere und reichhaltigere Texte hinweist.
- **Niedrigeres TTR:** Weniger Vielfalt im Wortschatz, was auf wiederholende oder monotone Sprache hinweisen kann.
""")
# **Duplikat-Erkennung**
st.subheader("Duplikat-Erkennung")
st.markdown("""
**Duplikat-Erkennung:** Identifiziert doppelte oder nahezu doppelte Texte, um Redundanzen in den Daten zu vermeiden.
""")
duplicate_counts = df[text_field].duplicated().sum()
st.write(f"**Anzahl der doppelten Texte:** {duplicate_counts}")
if duplicate_counts > 0:
duplicates = df[df[text_field].duplicated(keep=False)]
st.write("**Doppelte Texte:**")
st.write(duplicates[[text_field]])
# **N-Gramm-Analyse**
st.subheader("N-Gramm-Analyse")
st.markdown("""
**N-Gramm-Analyse:** Analysiert häufig vorkommende Phrasen (Bigrams), um gängige Ausdrucksweisen oder Themen zu identifizieren.
""")
def get_ngrams(text, n=2):
tokens = nltk.word_tokenize(text)
return list(ngrams(tokens, n))
bigrams = texts.apply(lambda x: get_ngrams(x, 2)).explode()
bigram_counts = Counter(bigrams).most_common(20)
bigram_df = pd.DataFrame(bigram_counts, columns=['Bigram', 'Anzahl'])
bigram_df['Bigram'] = bigram_df['Bigram'].apply(lambda x: ' '.join(x))
fig6 = px.bar(
bigram_df,
x='Anzahl',
y='Bigram',
orientation='h',
labels={'Anzahl': 'Anzahl der Vorkommen'},
title="Top 20 Bigrams",
hover_data={'Anzahl': True}
)
fig6.update_traces(marker_color='magenta')
st.plotly_chart(fig6, use_container_width=True, key='bigram_plot')
st.markdown("""
**Interpretation der N-Gramm-Analyse:**
- **Häufige Bigrams:** Die am häufigsten vorkommenden Bigrams können auf gängige Phrasen oder Themen in den Texten hinweisen.
""")
def json_filter_tab(df):
st.header("🔄 Datenfilter")
# Da die JSON-Datei bereits zentral geladen ist, keine weitere Auswahl nötig
st.info("Verwenden Sie die bereits geladene JSON-Datei für die Filterung.")
data = df.to_dict(orient='records')
all_fields = set(df.columns.tolist())
# Filter 1: Leere Felder (standardmäßig deaktiviert)
empty_field_filter_active = st.checkbox("🚫 Leere Felder filtern", value=False)
if empty_field_filter_active:
selected_empty_fields = st.multiselect(
"Felder zur Prüfung auf leere Werte auswählen",
options=list(all_fields),
default=[]
)
st.warning("Dieser Filter entfernt Datensätze, bei denen ausgewählte Felder keine Werte enthalten.")
# Filter 2: Feld-Wert Kombination mit Operatoren und Auto-Vervollständigung
field_value_filter_active = st.checkbox("🔍 Feld-Wert Kombinationen filtern")
if field_value_filter_active:
st.warning("Dieser Filter entfernt Datensätze, bei denen die angegebenen Feld-Wert-Kombinationen nicht zutreffen.")
field_value_filters = []
field_value_count = st.number_input("Anzahl der Feld-Wert-Kombinationen", min_value=1, value=1, step=1)
operators = ["=", "!=", ">", "<", ">=", "<="]
operator_map = {
"=": lambda a, b: a == b,
"!=": lambda a, b: a != b,
">": lambda a, b: a > b,
"<": lambda a, b: a < b,
">=": lambda a, b: a >= b,
"<=": lambda a, b: a <= b
}
for i in range(int(field_value_count)):
col1, col2, col3 = st.columns(3)
with col1:
field = st.selectbox(f"Feld {i+1}", options=list(all_fields), key=f"filter_field_{i}")
with col2:
operator = st.selectbox(f"Operator {i+1}", options=operators, key=f"filter_operator_{i}")
with col3:
value = st.text_input(f"Wert {i+1}", key=f"filter_value_{i}")
field_value_filters.append((field, operator, value))
# Filter 3: Mindestzeichenlänge
length_filter_active = st.checkbox("✂️ Filter nach Mindestzeichenlänge")
if length_filter_active:
selected_length_fields = st.multiselect("Felder zur Zeichenlänge auswählen", options=list(all_fields))
min_length = st.number_input("Mindestzeichenlänge", min_value=1, value=30)
st.warning("Dieser Filter entfernt Datensätze, bei denen die Länge der ausgewählten Felder kürzer als die angegebene Mindestlänge ist.")
# Filter 4: Gleichverteilung
balancing_filter_active = st.checkbox("⚖️ Filter zur Gleichverteilung")
if balancing_filter_active:
selected_balancing_fields = st.multiselect("Felder für Gleichverteilung auswählen", options=list(all_fields))
total_count = st.number_input("Gesamtanzahl der Datensätze nach Gleichverteilung", min_value=1, value=100)
st.warning("Dieser Filter reduziert die Daten auf eine Gesamtanzahl und verteilt sie möglichst gleichmäßig auf die angegebenen Felder.")
# Filter 5: Felder aus JSON entfernen
remove_fields_filter_active = st.checkbox("🗑️ Felder aus JSON entfernen")
if remove_fields_filter_active:
fields_to_remove = st.multiselect("Wähle die Felder, die entfernt werden sollen", options=list(all_fields), default=[])
st.warning("Dieser Filter entfernt ausgewählte Felder aus den Datensätzen.")
# Filter 6: Duplizierte Datensätze entfernen
duplicate_filter_active = st.checkbox("🔁 Duplikate entfernen")
if duplicate_filter_active:
duplicate_fields = st.multiselect("Wähle die Felder, auf denen die Duplikate basieren sollen", options=list(all_fields), default=[])
st.warning("Dieser Filter entfernt doppelte Datensätze basierend auf den ausgewählten Feldern.")
# Statusmeldungen und Debugging Informationen
if st.button("✅ Daten filtern und speichern"):
st.info("Starte Filterprozess...")
filtered_data = data.copy() # Kopiere Originaldaten
# Leere Felder filtern
if empty_field_filter_active and selected_empty_fields:
st.info("🚫 Leere Felder filtern...")
filtered_data = [
item for item in filtered_data
if all(
(field_value := get_nested_value(item, field)) is not None
and not is_field_empty(field_value)
for field in selected_empty_fields
)
]
st.write(f"Anzahl der Datensätze nach Filterung leere Felder: {len(filtered_data)}")
# Feld-Wert Kombinationen filtern
if field_value_filter_active and field_value_filters:
st.info("🔍 Feld-Wert Kombinationen filtern...")
for field, operator, value in field_value_filters:
op_func = operator_map[operator]
try:
# Versuche den Wert in einen numerischen Typ zu konvertieren, wenn möglich
try:
value_converted = float(value)
except ValueError:
value_converted = value
filtered_data = [
item for item in filtered_data
if (field_value := get_nested_value(item, field)) is not None and (
(isinstance(field_value, list) and any(
isinstance(v, (int, float, str)) and op_func(v, value_converted) for v in field_value
)) or (isinstance(field_value, (int, float, str)) and op_func(field_value, value_converted))
)
]
except TypeError:
st.error(f"Der Wert im Feld '{field}' kann nicht mit dem Operator '{operator}' verglichen werden.")
st.write(f"Anzahl der Datensätze nach Feld-Wert Filter: {len(filtered_data)}")
# Zeichenlänge filtern
if length_filter_active and selected_length_fields:
st.info("✂️ Filterung nach Zeichenlänge...")
filtered_data = [
item for item in filtered_data
if all(
(field_value := get_nested_value(item, field)) is not None
and (
(isinstance(field_value, str) and len(field_value) >= min_length)
or (isinstance(field_value, list) and any(isinstance(v, str) and len(v) >= min_length for v in field_value))
)
for field in selected_length_fields
)
]
st.write(f"Anzahl der Datensätze nach Filterung der Mindestzeichenlänge: {len(filtered_data)}")
# Gleichverteilung filtern
if balancing_filter_active and selected_balancing_fields:
st.info("⚖️ Starte Gleichverteilung...")
field_groups = defaultdict(list)
for item in filtered_data:
# Erstelle einen hashbaren Schlüssel, indem Listen in Tupel umgewandelt werden
key = tuple(
tuple(get_nested_value(item, field)) if isinstance(get_nested_value(item, field), list) else get_nested_value(item, field)
for field in selected_balancing_fields
)
field_groups[key].append(item)
balanced_data = []
groups = list(field_groups.values())
if groups:
group_cycle = cycle(groups)
while len(balanced_data) < total_count and groups:
try:
group = next(group_cycle)
if group:
balanced_data.append(group.pop(0))
if not group:
groups.remove(group)
group_cycle = cycle(groups)
except StopIteration:
break
filtered_data = balanced_data[:total_count]
st.write(f"Anzahl der Datensätze nach Gleichverteilung: {len(filtered_data)}")
# Duplikate entfernen
if duplicate_filter_active and duplicate_fields:
st.info("🔁 Duplikate entfernen...")
initial_count = len(filtered_data)
if duplicate_fields:
filtered_data = pd.DataFrame(filtered_data).drop_duplicates(subset=duplicate_fields, keep='first').to_dict(orient='records')
filtered_count = len(filtered_data)
st.write(f"Duplikate entfernt: {initial_count - filtered_count}")
st.write(f"Anzahl der verbleibenden Datensätze: {filtered_count}")
else:
st.warning("Bitte wählen Sie mindestens ein Feld für die Duplikatenfilterung aus.")
# Felder entfernen
if remove_fields_filter_active and fields_to_remove:
st.info("🗑️ Felder entfernen...")
filtered_data = remove_fields(filtered_data, fields_to_remove)
st.write(f"Anzahl der Datensätze nach Entfernen der Felder: {len(filtered_data)} (Anzahl bleibt gleich)")
# Speichern der gefilterten Daten mit Namenszusätzen
timestamp = current_timestamp()
filters_applied = []
if empty_field_filter_active and selected_empty_fields:
filters_applied.append("emptyfields")
if field_value_filter_active and field_value_filters:
filters_applied.append("fieldvalue")
if length_filter_active and selected_length_fields:
filters_applied.append(f"minlength{min_length}")
if balancing_filter_active and selected_balancing_fields:
filters_applied.append("balancing")
if duplicate_filter_active and duplicate_fields:
filters_applied.append("duplicates")
if remove_fields_filter_active and fields_to_remove:
filters_applied.append("removefields")
output_filename = f"{st.session_state.get('selected_file', 'output').split('.')[0]}_{'_'.join(filters_applied)}_{timestamp}.json"
output_path = os.path.join(os.getcwd(), output_filename)
save_json(filtered_data, output_path)
st.write(f"Anzahl der Datensätze nach dem Speichern: {len(filtered_data)}")
st.success(f"Gefilterte Daten wurden gespeichert als: {output_filename}")
def data_viewer_tab(df):
st.header("📁 Datenbetrachter")
with st.expander("🔍 Vorschau des Datensatzes"):
if 'viewer_index' not in st.session_state:
st.session_state.viewer_index = 0
current_record = preview_data(df.to_dict(orient='records'), st.session_state.viewer_index)
st.json(current_record)
# Anzeige des aktuellen Datensatzes nach dem Button-Klick
st.write(f"**Datensatz Nummer:** {st.session_state.viewer_index + 1}")
# Buttons zum Blättern (jetzt unter den Elementen)
col_prev, col_next = st.columns([1,1])
with col_prev:
if st.button("⬅️ Vorheriger Datensatz", key='prev_viewer'):
if st.session_state.viewer_index > 0:
st.session_state.viewer_index -= 1
with col_next:
if st.button("➡️ Nächster Datensatz", key='next_viewer'):
if st.session_state.viewer_index < len(df) - 1:
st.session_state.viewer_index += 1
st.subheader("📊 Datensätze anzeigen")
st.markdown("**Hinweis:** Große Datensätze können sehr lange dauern, um angezeigt zu werden.")
col_start, col_end = st.columns(2)
with col_start:
start_num = st.number_input("Start Datensatz Nummer", min_value=1, value=1, step=1, key='start_num')
with col_end:
end_num = st.number_input("Ende Datensatz Nummer", min_value=1, value=min(len(df), 10), step=1, key='end_num')
st.write("") # Leerzeile
if st.button("🔄 Daten anzeigen"):
if end_num < start_num:
st.error("Ende Nummer muss größer oder gleich Start Nummer sein.")
elif end_num > len(df):
st.error(f"Ende Nummer darf nicht größer als die Anzahl der Datensätze ({len(df)}) sein.")
else:
st.write(df.iloc[start_num-1:end_num])
def werteverteilung_tab(df):
st.header("📈 Werteverteilung")
metadata_fields = df.select_dtypes(include=['object', 'category']).columns.tolist()
if not metadata_fields:
st.write("Keine Metadatenfelder gefunden.")
else:
search_field = st.text_input("Feld suchen", "", key='metadata_search')
if search_field:
filtered_fields = [field for field in metadata_fields if search_field.lower() in field.lower()]
else:
filtered_fields = metadata_fields
if filtered_fields:
selected_fields = st.multiselect("Wähle Metadatenfelder zur Visualisierung", filtered_fields, key='metadata_select_multi')
if selected_fields:
for field in selected_fields:
# Handle multiple values by splitting
value_series = df[field].dropna().astype(str).str.split(', ').explode()
value_counts = value_series.value_counts().head(20)
st.write(value_counts.to_frame().reset_index().rename(columns={'index': field, field: 'Anzahl'}))
# Truncate labels for readability
df_counts = value_counts.to_frame().reset_index()
df_counts.columns = [field, 'Anzahl']
df_counts[field] = df_counts[field].apply(lambda x: x[:20] + '...' if len(x) > 20 else x)
fig = px.bar(
df_counts,
x='Anzahl',
y=field,
orientation='h',
labels={'Anzahl': 'Anzahl der Vorkommen', field: 'Feld'},
title="", # Entfernt den Titel über der Grafik
hover_data={'Anzahl': True}
)
fig.update_traces(marker_color='blue')
st.plotly_chart(fig, use_container_width=True, key=f'werteverteilung_{field}')
else:
st.write("Keine Felder gefunden, die dem Suchbegriff entsprechen.")
def fuellstandsanalyse_tab(df):
st.header("📊 Füllstandsanalyse")
st.write("Filtere Daten basierend auf Feldern und Werten und analysiere den Füllstand der Metadaten.")
# Auswahl der Felder und Werte zum Filtern
st.subheader("🔍 Filter auswählen")
selected_fill_fields = st.multiselect(
"Felder zur Filterung auswählen",
options=df.columns.tolist(),
default=[]
)
fill_field_values = {}
for field in selected_fill_fields:
unique_values = df[field].dropna().unique().tolist()
# Optimierung: Verwenden Sie Caching für einzigartige Werte
unique_values = st.cache_data(lambda x: x)(unique_values)
selected_values = st.multiselect(f"Wähle Werte für {field}", unique_values, default=[], key=f"fill_{field}")
fill_field_values[field] = selected_values
# Option, ob Filter eine gemeinsame Datenmenge bilden oder getrennt betrachtet werden
join_option = st.radio(
"Wie sollen die Filter angewendet werden?",
options=["Getrennt betrachten", "Gemeinsame Datenmenge bilden"],
index=0
)
# Auswahl der Felder zur Anzeige des Füllstands
st.subheader("📈 Füllstand anzeigen für:")
display_fill_fields = st.multiselect(
"Wähle Metadatenfelder zur Anzeige des Füllstands",
options=df.columns.tolist(),
default=[]
)
if st.button("🔄 Füllstand analysieren"):
st.info("Starte Füllstandsanalyse...")
if selected_fill_fields and any(fill_field_values[field] for field in selected_fill_fields):
if join_option == "Gemeinsame Datenmenge bilden":
# Filtern der Daten, die alle ausgewählten Feld-Wert-Kombinationen erfüllen
filtered_df = df.copy()
for field, values in fill_field_values.items():
if values:
filtered_df = filtered_df[filtered_df[field].isin(values)]
subsets = {"Gemeinsame Datenmenge": filtered_df}
else:
# Jede Feld-Wert-Kombination als separate Teilmenge
subsets = {}
for field, values in fill_field_values.items():
if values:
for value in values:
subset_name = f"{field} = {value}"
subsets[subset_name] = df[df[field].isin([value])]
else:
# Keine Filter angewendet, eine einzige Teilmenge
subsets = {"Alle Daten": df}
if display_fill_fields:
# Begrenzen der Anzahl der Grafiken, um die Übersichtlichkeit zu wahren
max_columns = 2 # Ändert die Anzahl der Spalten von 3 auf 2
num_subsets = len(subsets)
num_cols = min(max_columns, num_subsets)
cols = st.columns(num_cols)
for idx, (subset_name, subset_df) in enumerate(subsets.items()):
col = cols[idx % num_cols]
with col:
# Füge einen Hinweistext über der Grafik hinzu
st.markdown(f"**Filter:** {subset_name}")
# Entfernt die Überschrift über den Grafiken
fill_status = subset_df[display_fill_fields].notnull().mean() * 100
fill_status = fill_status.sort_values(ascending=False)
# Dynamische Anpassung der Grafikgröße
num_bars = len(fill_status)
if num_bars == 1:
fig_height = 400
else:
fig_height = max(400, num_bars * 50)
# Daten vorbereiten für zweifarbige Balken
fill_percentage = fill_status
empty_percentage = 100 - fill_status
fill_data = pd.DataFrame({
'Metadatenfeld': fill_status.index,
'Gefüllt (%)': fill_percentage.values,
'Leer (%)': empty_percentage.values
})
# Melt the DataFrame für gestapelte Balkendiagramme
fill_data_melted = fill_data.melt(id_vars='Metadatenfeld', value_vars=['Gefüllt (%)', 'Leer (%)'], var_name='Status', value_name='Prozent')
fig = px.bar(
fill_data_melted,
x='Prozent',
y='Metadatenfeld',
color='Status',
orientation='h',
title="", # Entfernt den Titel über der Grafik
labels={'Prozent': 'Prozent (%)', 'Metadatenfeld': 'Metadatenfeld'},
hover_data={'Prozent': True, 'Status': True}
)
fig.update_layout(barmode='stack', height=fig_height, showlegend=True)
st.plotly_chart(fig, use_container_width=True, key=f'fuellstand_plot_{idx}')
else:
st.warning("Bitte wählen Sie mindestens ein Feld zur Anzeige des Füllstands aus.")
def text_analysis_tab(df):
st.header("📝 Textanalyse")
text_fields = df.select_dtypes(include=['object', 'category']).columns.tolist()
if not text_fields:
st.write("Keine Textfelder gefunden.")
else:
search_text_field = st.text_input("Feld suchen", "", key='text_search')
if search_text_field:
filtered_text_fields = [field for field in text_fields if search_text_field.lower() in field.lower()]
else:
filtered_text_fields = text_fields
if filtered_text_fields:
selected_text_field = st.selectbox("Wähle ein Metadatenfeld mit Text", filtered_text_fields, key='text_select')
if selected_text_field:
min_chars = st.number_input("Minimale Anzahl der Zeichen zum Filtern", min_value=0, value=0, step=1, key='text_min_chars')
text_analysis(df, selected_text_field, min_chars)
else:
st.write("Keine Textfelder gefunden, die dem Suchbegriff entsprechen.")
def main():
st.set_page_config(page_title="Universelles Datentool für JSON Dateien", layout="wide")
# Seitenleiste für Titel und JSON-Dateiauswahl
with st.sidebar:
st.title("Universelles Datentool für JSON Dateien")
st.write("Dieses Tool ermöglicht die Analyse und Bearbeitung von JSON-Daten mit beliebiger Struktur.")
data_dir = os.getcwd()
json_files = list_json_files(data_dir)
if not json_files:
st.warning(f"Keine JSON-Dateien im Verzeichnis '{data_dir}' gefunden. Bitte laden Sie eine Datei in das Verzeichnis hoch.")
selected_file = None
else:
selected_file = st.selectbox("📂 Wähle eine JSON-Datei aus dem aktuellen Verzeichnis", json_files, key='json_select')
if st.button("🔍 Datei laden") and selected_file:
with st.spinner("Verarbeite Datei..."):
df = process_uploaded_file(os.path.join(data_dir, selected_file))
if not df.empty:
df = merge_similar_fields(df)
st.session_state['df'] = df
st.session_state['selected_file'] = selected_file # Speichern des ausgewählten Dateinamens
# Vermerk am unteren Ende der Sidebar
st.markdown("---")
st.markdown("**by Jan Schachtschabel**")
# Überprüfen, ob Daten geladen sind
if 'df' in st.session_state and not st.session_state['df'].empty:
df = st.session_state['df']
selected_file = st.session_state.get('selected_file', 'output')
# Erstellen der Tabs im Hauptbereich
tabs = st.tabs(["Datenbetrachter", "Werteverteilung", "Füllstandsanalyse", "Textanalyse", "Datenfilter"])
with tabs[0]:
data_viewer_tab(df)
with tabs[1]:
werteverteilung_tab(df)
with tabs[2]:
fuellstandsanalyse_tab(df)
with tabs[3]:
text_analysis_tab(df)
with tabs[4]:
json_filter_tab(df)
if __name__ == "__main__":
main()
|
Tool für die Bewertung
Python-Script zur Durchführung des Tests und der Bewertung mittels LLM
...