Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 111 additions & 53 deletions packages/markitdown/src/markitdown/_markitdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
from urllib.parse import urlparse
from warnings import warn
import requests
import magika
import charset_normalizer
import codecs

from ._stream_info import StreamInfo, _guess_stream_info_from_stream
from ._stream_info import StreamInfo

from .converters import (
PlainTextConverter,
Expand Down Expand Up @@ -107,6 +110,8 @@ def __init__(
else:
self._requests_session = requests_session

self._magika = magika.Magika()

# TODO - remove these (see enable_builtins)
self._llm_client: Any = None
self._llm_model: Union[str | None] = None
Expand Down Expand Up @@ -273,33 +278,28 @@ def convert_local(
path = str(path)

# Build a base StreamInfo object from which to start guesses
base_stream_info = StreamInfo(
base_guess = StreamInfo(
local_path=path,
extension=os.path.splitext(path)[1],
filename=os.path.basename(path),
)

# Extend the base_stream_info with any additional info from the arguments
# Extend the base_guess with any additional info from the arguments
if stream_info is not None:
base_stream_info = base_stream_info.copy_and_update(stream_info)
base_guess = base_guess.copy_and_update(stream_info)

if file_extension is not None:
# Deprecated -- use stream_info
base_stream_info = base_stream_info.copy_and_update(
extension=file_extension
)
base_guess = base_guess.copy_and_update(extension=file_extension)

if url is not None:
# Deprecated -- use stream_info
base_stream_info = base_stream_info.copy_and_update(url=url)
base_guess = base_guess.copy_and_update(url=url)

with open(path, "rb") as fh:
# Prepare a list of configurations to try, starting with the base_stream_info
guesses: List[StreamInfo] = [base_stream_info]
for guess in _guess_stream_info_from_stream(
file_stream=fh, filename_hint=path
):
guesses.append(base_stream_info.copy_and_update(guess))
guesses = self._get_stream_info_guesses(
file_stream=fh, base_guess=base_guess
)
return self._convert(file_stream=fh, stream_info_guesses=guesses, **kwargs)

def convert_stream(
Expand Down Expand Up @@ -332,21 +332,6 @@ def convert_stream(
assert base_guess is not None # for mypy
base_guess = base_guess.copy_and_update(url=url)

# Append the base guess, if it's non-trivial
if base_guess is not None:
if base_guess.mimetype is not None or base_guess.extension is not None:
guesses.append(base_guess)
else:
# Create a base guess with no information
base_guess = StreamInfo()

# Create a placeholder filename to help with guessing
placeholder_filename = None
if base_guess.filename is not None:
placeholder_filename = base_guess.filename
elif base_guess.extension is not None:
placeholder_filename = "placeholder" + base_guess.extension

# Check if we have a seekable stream. If not, load the entire stream into memory.
if not stream.seekable():
buffer = io.BytesIO()
Expand All @@ -359,12 +344,9 @@ def convert_stream(
stream = buffer

# Add guesses based on stream content
for guess in _guess_stream_info_from_stream(
file_stream=stream, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))

# Perform the conversion
guesses = self._get_stream_info_guesses(
file_stream=stream, base_guess=base_guess or StreamInfo()
)
return self._convert(file_stream=stream, stream_info_guesses=guesses, **kwargs)

def convert_url(
Expand Down Expand Up @@ -435,31 +417,16 @@ def convert_response(
# Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(url=url)

# Add the guess if its non-trivial
guesses: List[StreamInfo] = []
if base_guess.mimetype is not None or base_guess.extension is not None:
guesses.append(base_guess)

# Read into BytesIO
buffer = io.BytesIO()
for chunk in response.iter_content(chunk_size=512):
buffer.write(chunk)
buffer.seek(0)

# Create a placeholder filename to help with guessing
placeholder_filename = None
if base_guess.filename is not None:
placeholder_filename = base_guess.filename
elif base_guess.extension is not None:
placeholder_filename = "placeholder" + base_guess.extension

# Add guesses based on stream content
for guess in _guess_stream_info_from_stream(
file_stream=buffer, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))

# Convert
guesses = self._get_stream_info_guesses(
file_stream=buffer, base_guess=base_guess
)
return self._convert(file_stream=buffer, stream_info_guesses=guesses, **kwargs)

def _convert(
Expand Down Expand Up @@ -593,3 +560,94 @@ def register_converter(
self._converters.insert(
0, ConverterRegistration(converter=converter, priority=priority)
)

def _get_stream_info_guesses(
self, file_stream: BinaryIO, base_guess: StreamInfo
) -> List[StreamInfo]:
"""
Given a base guess, attempt to guess or expand on the stream info using the stream content (via magika).
"""
guesses: List[StreamInfo] = []

# Call magika to guess from the stream
cur_pos = file_stream.tell()
try:
stream_bytes = file_stream.read()

result = self._magika.identify_bytes(stream_bytes)
if result.status == "ok" and result.prediction.output.label != "unknown":
# If it's text, also guess the charset
charset = None
if result.prediction.output.is_text:
charset_result = charset_normalizer.from_bytes(stream_bytes).best()
if charset_result is not None:
charset = self._normalize_charset(charset_result.encoding)

# Normalize the first extension listed
guessed_extension = None
if len(result.prediction.output.extensions) > 0:
guessed_extension = "." + result.prediction.output.extensions[0]

# Determine if the guess is compatible with the base guess
compatible = True
if (
base_guess.mimetype is not None
and base_guess.mimetype != result.prediction.output.mime_type
):
compatible = False

if (
base_guess.extension is not None
and base_guess.extension.lstrip(".")
not in result.prediction.output.extensions
):
compatible = False

if (
base_guess.charset is not None
and self._normalize_charset(base_guess.charset) != charset
):
compatible = False

if compatible:
# Add the compatible base guess
guesses.append(
StreamInfo(
mimetype=base_guess.mimetype
or result.prediction.output.mime_type,
extension=base_guess.extension or guessed_extension,
charset=base_guess.charset or charset,
filename=base_guess.filename,
local_path=base_guess.local_path,
url=base_guess.url,
)
)
else:
# The magika guess was incompatible with the base guess, so add both guesses
guesses.append(base_guess)
guesses.append(
StreamInfo(
mimetype=result.prediction.output.mime_type,
extension=guessed_extension,
charset=charset,
filename=base_guess.filename,
local_path=base_guess.local_path,
url=base_guess.url,
)
)
else:
# There were no other guesses, so just add the base guess
guesses.append(base_guess)
finally:
file_stream.seek(cur_pos)

return guesses

def _normalize_charset(self, charset: str) -> str:
"""
Normalize a charset string to a canonical form.
"""
try:
return codecs.lookup(charset).name
except LookupError:
return charset
62 changes: 1 addition & 61 deletions packages/markitdown/src/markitdown/_stream_info.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import mimetypes
import os
from dataclasses import dataclass, asdict
from typing import Optional, BinaryIO, List, TypeVar, Type
from magika import Magika

magika = Magika()
from typing import Optional


@dataclass(kw_only=True, frozen=True)
Expand Down Expand Up @@ -35,58 +30,3 @@ def copy_and_update(self, *args, **kwargs):
new_info.update(kwargs)

return StreamInfo(**new_info)


# Behavior subject to change.
# Do not rely on this outside of this module.
def _guess_stream_info_from_stream(
file_stream: BinaryIO,
*,
filename_hint: Optional[str] = None,
) -> List[StreamInfo]:
"""
Guess StreamInfo properties (mostly mimetype and extension) from a stream.

Args:
- stream: The stream to guess the StreamInfo from.
- filename_hint [Optional]: A filename hint to help with the guessing (may be a placeholder, and not actually be the file name)

Returns a list of StreamInfo objects in order of confidence.
"""
guesses: List[StreamInfo] = []

# Call magika to guess from the stream
cur_pos = file_stream.tell()
try:
result = magika.identify_bytes(file_stream.read())
if result.status == "ok" and result.prediction.output.label != "unknown":
extension = None
if len(result.prediction.output.extensions) > 0:
extension = result.prediction.output.extensions[0]
if extension and not extension.startswith("."):
extension = "." + extension
guesses.append(
StreamInfo(
mimetype=result.prediction.output.mime_type,
extension=extension,
)
)
finally:
file_stream.seek(cur_pos)

# Add a guess purely based on the filename hint
if filename_hint:
try:
# Requires Python 3.13+
mimetype, _ = mimetypes.guess_file_type(filename_hint) # type: ignore
except AttributeError:
mimetype, _ = mimetypes.guess_type(filename_hint)

if mimetype:
guesses.append(
StreamInfo(
mimetype=mimetype, extension=os.path.splitext(filename_hint)[1]
)
)

return guesses
11 changes: 8 additions & 3 deletions packages/markitdown/tests/test_markitdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
FileConversionException,
StreamInfo,
)
from markitdown._stream_info import _guess_stream_info_from_stream

skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False
Expand Down Expand Up @@ -265,10 +264,16 @@ def test_stream_info_guesses() -> None:
(os.path.join(TEST_FILES_DIR, "test.xls"), "application/vnd.ms-excel"),
]

markitdown = MarkItDown()
for file_path, expected_mimetype in test_tuples:
with open(file_path, "rb") as f:
guesses = _guess_stream_info_from_stream(
f, filename_hint=os.path.basename(file_path)
guesses = markitdown._get_stream_info_guesses(
f,
StreamInfo(
filename=os.path.basename(file_path),
local_path=file_path,
extension=os.path.splitext(file_path)[1],
),
)
assert len(guesses) > 0
assert guesses[0].mimetype == expected_mimetype
Expand Down