Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions src/littlefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,29 @@
class LittleFS:
"""Littlefs file system"""

def __init__(self, context: Optional["UserContext"] = None, mount=True, **kwargs) -> None:
def __init__(
self,
context: Optional["UserContext"] = None,
mount=True,
filename_encoding: Optional[str] = None,
**kwargs,
) -> None:
"""
Parameters
----------
filename_encoding : Optional[str]
Encoding used to encode/decode filenames passed to and returned by
the filesystem. littlefs stores names as opaque byte strings, so this
is a free choice. Defaults to :data:`littlefs.lfs.FILENAME_ENCODING`
(``"utf-8"``). Set this when reading an image whose names were written
with a different encoding (e.g. ``"latin-1"`` or ``"shift-jis"``).

Note that littlefs's ``name_max`` limit is measured in *encoded
bytes*, not characters. With a multi-byte encoding such as UTF-8, a
single non-ASCII character consumes 2-4 bytes, so a name can exceed
``name_max`` (default 255) well before it looks long.
"""
self.filename_encoding = filename_encoding or lfs.FILENAME_ENCODING
self.cfg = lfs.LFSConfig(context=context, **kwargs)
self.fs = lfs.LFSFilesystem()

Expand Down Expand Up @@ -204,7 +226,7 @@ def open(
buffering = -1

try:
fh = lfs.file_open(self.fs, fname, mode)
fh = lfs.file_open(self.fs, fname, mode, self.filename_encoding)
except LittleFSError as e:
# Try to map to standard Python exceptions
if e.code == LittleFSError.Error.LFS_ERR_NOENT:
Expand Down Expand Up @@ -251,15 +273,15 @@ def open(

def getattr(self, path: str, typ: Union[str, bytes, int]) -> bytes:
typ = _typ_to_uint8(typ)
return lfs.getattr(self.fs, path, typ)
return lfs.getattr(self.fs, path, typ, self.filename_encoding)

def setattr(self, path: str, typ: Union[str, bytes, int], data: bytes) -> None:
typ = _typ_to_uint8(typ)
lfs.setattr(self.fs, path, typ, data)
lfs.setattr(self.fs, path, typ, data, self.filename_encoding)

def removeattr(self, path: str, typ: Union[str, bytes, int]) -> None:
typ = _typ_to_uint8(typ)
lfs.removeattr(self.fs, path, typ)
lfs.removeattr(self.fs, path, typ, self.filename_encoding)

def listdir(self, path=".") -> List[str]:
"""List directory content
Expand All @@ -274,7 +296,7 @@ def listdir(self, path=".") -> List[str]:
def mkdir(self, path: str) -> int:
"""Create a new directory"""
try:
return lfs.mkdir(self.fs, path)
return lfs.mkdir(self.fs, path, self.filename_encoding)
except errors.LittleFSError as e:
if e.code == LittleFSError.Error.LFS_ERR_EXIST:
msg = "[LittleFSError {:d}] Cannot create a file when that file already exists: '{:s}'.".format(
Expand Down Expand Up @@ -310,7 +332,7 @@ def remove(self, path: str, recursive: bool = False) -> None:
If ``true`` and ``path`` is a directory, recursively remove all children files/folders.
"""
try:
lfs.remove(self.fs, path)
lfs.remove(self.fs, path, self.filename_encoding)
return
except errors.LittleFSError as e:
if e.code == LittleFSError.Error.LFS_ERR_NOENT:
Expand All @@ -326,7 +348,7 @@ def remove(self, path: str, recursive: bool = False) -> None:
# Recursively delete the ``path`` directory
for elem in self.scandir(path):
self.remove(path + "/" + elem.name, recursive=True)
lfs.remove(self.fs, path)
lfs.remove(self.fs, path, self.filename_encoding)

def removedirs(self, name):
"""Remove directories recursively
Expand All @@ -351,7 +373,7 @@ def removedirs(self, name):

def rename(self, src: str, dst: str) -> int:
"""Rename a file or directory"""
return lfs.rename(self.fs, src, dst)
return lfs.rename(self.fs, src, dst, self.filename_encoding)

def rmdir(self, path: str) -> int:
"""Remove a directory
Expand All @@ -362,17 +384,19 @@ def rmdir(self, path: str) -> int:

def scandir(self, path=".") -> Iterator["LFSStat"]:
"""List directory content"""
dh = lfs.dir_open(self.fs, path)
info = lfs.dir_read(self.fs, dh)
while info:
if info.name not in [".", ".."]:
yield info
info = lfs.dir_read(self.fs, dh)
lfs.dir_close(self.fs, dh)
dh = lfs.dir_open(self.fs, path, self.filename_encoding)
try:
info = lfs.dir_read(self.fs, dh, self.filename_encoding)
while info:
if info.name not in [".", ".."]:
yield info
info = lfs.dir_read(self.fs, dh, self.filename_encoding)
finally:
lfs.dir_close(self.fs, dh)

def stat(self, path: str) -> "LFSStat":
"""Get the status of a file or directory"""
return lfs.stat(self.fs, path)
return lfs.stat(self.fs, path, self.filename_encoding)

def unlink(self, path: str) -> int:
"""Remove a file or directory
Expand Down
10 changes: 10 additions & 0 deletions src/littlefs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def _fs_from_args(args: argparse.Namespace, block_count=None, mount=True, contex
"inline_max": args.inline_max,
"attr_max": args.attr_max,
"file_max": args.file_max,
"filename_encoding": getattr(args, "filename_encoding", None),
}
return LittleFS(context=context, mount=mount, **kwargs)

Expand Down Expand Up @@ -294,6 +295,15 @@ def get_parser():
default=0,
help="Max inline file size; 0 = use library default. Limiting can improve flash usage.",
)
# Host-side encode/decode choice; never stored in the image. The same encoding
# must be used to extract an image as was used to create it, otherwise filenames
# will fail to decode or come out as mojibake.
common_parser.add_argument(
"--filename-encoding",
default=None,
help="Encoding for filenames stored in the image. Defaults to utf-8. "
"Use e.g. latin-1 or shift-jis for images whose names use a different encoding.",
)

subparsers = parser.add_subparsers(required=True, title="Available Commands", dest="command")

Expand Down
22 changes: 12 additions & 10 deletions src/littlefs/lfs.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,19 @@ def mount(fs: LFSFilesystem, cfg: LFSConfig) -> int: ...
def unmount(fs: LFSFilesystem) -> int: ...
def fs_mkconsistent(fs: LFSFilesystem) -> int: ...
def fs_grow(fs: LFSFilesystem, block_count) -> int: ...
def remove(fs: LFSFilesystem, path: str) -> int: ...
def rename(fs: LFSFilesystem, oldpath: str, newpath: str) -> int: ...
def stat(fs: LFSFilesystem, path: str) -> LFSStat: ...
def remove(fs: LFSFilesystem, path: str, filename_encoding: Optional[str] = ...) -> int: ...
def rename(fs: LFSFilesystem, oldpath: str, newpath: str, filename_encoding: Optional[str] = ...) -> int: ...
def stat(fs: LFSFilesystem, path: str, filename_encoding: Optional[str] = ...) -> LFSStat: ...

# Attributes
def getattr(fs: LFSFilesystem, path: str, typ) -> bytes: ...
def setattr(fs: LFSFilesystem, path: str, typ, data) -> None: ...
def removeattr(fs: LFSFilesystem, path: str, typ) -> None: ...
def getattr(fs: LFSFilesystem, path: str, typ, filename_encoding: Optional[str] = ...) -> bytes: ...
def setattr(fs: LFSFilesystem, path: str, typ, data, filename_encoding: Optional[str] = ...) -> None: ...
def removeattr(fs: LFSFilesystem, path: str, typ, filename_encoding: Optional[str] = ...) -> None: ...

# File Handling
def file_open(fs: LFSFilesystem, path: str, flags: Union[str, LFSFileFlag]) -> LFSFile: ...
def file_open(
fs: LFSFilesystem, path: str, flags: Union[str, LFSFileFlag], filename_encoding: Optional[str] = ...
) -> LFSFile: ...

# def file_open_cfg(self, path, flags, config): ...
def file_close(fs: LFSFilesystem, fh: LFSFile) -> int: ...
Expand All @@ -111,9 +113,9 @@ def file_rewind(fs: LFSFilesystem, fh: LFSFile) -> int: ...
def file_size(fs: LFSFilesystem, fh: LFSFile) -> int: ...

# Directory Handling
def mkdir(fs: LFSFilesystem, path: str) -> int: ...
def dir_open(fs: LFSFilesystem, path: str) -> LFSDirectory: ...
def mkdir(fs: LFSFilesystem, path: str, filename_encoding: Optional[str] = ...) -> int: ...
def dir_open(fs: LFSFilesystem, path: str, filename_encoding: Optional[str] = ...) -> LFSDirectory: ...
def dir_close(fs: LFSFilesystem, dh: LFSDirectory) -> int: ...
def dir_read(fs: LFSFilesystem, dh: LFSDirectory) -> LFSStat: ...
def dir_read(fs: LFSFilesystem, dh: LFSDirectory, filename_encoding: Optional[str] = ...) -> Optional[LFSStat]: ...
def dir_tell(fs: LFSFilesystem, dh: LFSDirectory) -> int: ...
def dir_rewind(fs: LFSFilesystem, dh: LFSDirectory) -> int: ...
65 changes: 40 additions & 25 deletions src/littlefs/lfs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ from littlefs.context import UserContext
from littlefs import errors


FILENAME_ENCODING = 'ascii'
"""Default filename encoding"""
FILENAME_ENCODING = 'utf-8'
"""Default filename encoding.

littlefs stores names as opaque byte strings, so any encoding works on the
C side. UTF-8 is used so that non-ASCII filenames are supported; since ASCII
is a strict subset of UTF-8, existing ASCII names encode/decode identically.
"""

class LFSStat(NamedTuple):
"""Littlefs File / Directory status."""
Expand Down Expand Up @@ -329,50 +334,56 @@ def fs_grow(LFSFilesystem fs, block_count) -> int:
return _raise_on_error(lfs_fs_grow(&fs._impl, block_count))


def remove(LFSFilesystem fs, path):
def remove(LFSFilesystem fs, path, filename_encoding=None):
"""Remove a file or directory

If removing a directory, the directory must be empty.
"""
return _raise_on_error(lfs_remove(&fs._impl, path.encode(FILENAME_ENCODING)))
filename_encoding = filename_encoding or FILENAME_ENCODING
return _raise_on_error(lfs_remove(&fs._impl, path.encode(filename_encoding)))

def rename(LFSFilesystem fs, oldpath, newpath):
def rename(LFSFilesystem fs, oldpath, newpath, filename_encoding=None):
"""Rename or move a file or directory

If the destination exists, it must match the source in type.
If the destination is a directory, the directory must be empty.
"""
return _raise_on_error(lfs_rename(&fs._impl, oldpath.encode(FILENAME_ENCODING),
newpath.encode(FILENAME_ENCODING)))
filename_encoding = filename_encoding or FILENAME_ENCODING
return _raise_on_error(lfs_rename(&fs._impl, oldpath.encode(filename_encoding),
newpath.encode(filename_encoding)))


def stat(LFSFilesystem fs, path):
def stat(LFSFilesystem fs, path, filename_encoding=None):
"""Find info about a file or directory"""
filename_encoding = filename_encoding or FILENAME_ENCODING
cdef lfs_info * info = <lfs_info *>malloc(sizeof(lfs_info))
try:
_raise_on_error(lfs_stat(&fs._impl, path.encode(FILENAME_ENCODING), info))
return LFSStat(info.type, info.size, info.name.decode(FILENAME_ENCODING))
_raise_on_error(lfs_stat(&fs._impl, path.encode(filename_encoding), info))
return LFSStat(info.type, info.size, info.name.decode(filename_encoding))
finally:
free(info)


def getattr(LFSFilesystem fs, path, typ):
def getattr(LFSFilesystem fs, path, typ, filename_encoding=None):
filename_encoding = filename_encoding or FILENAME_ENCODING
buf = bytearray(LFS_ATTR_MAX)
cdef unsigned char[::1] buf_view = buf
attr_size = _raise_on_error(lfs_getattr(&fs._impl, path.encode(FILENAME_ENCODING), typ, &buf_view[0], LFS_ATTR_MAX))
attr_size = _raise_on_error(lfs_getattr(&fs._impl, path.encode(filename_encoding), typ, &buf_view[0], LFS_ATTR_MAX))
return bytes(buf[:attr_size])


def setattr(LFSFilesystem fs, path, typ, data):
def setattr(LFSFilesystem fs, path, typ, data, filename_encoding=None):
filename_encoding = filename_encoding or FILENAME_ENCODING
cdef const unsigned char[::1] buf_view = data
_raise_on_error(lfs_setattr(&fs._impl, path.encode(FILENAME_ENCODING), typ, &buf_view[0], len(data)))
_raise_on_error(lfs_setattr(&fs._impl, path.encode(filename_encoding), typ, &buf_view[0], len(data)))


def removeattr(LFSFilesystem fs, path, typ):
_raise_on_error(lfs_removeattr(&fs._impl, path.encode(FILENAME_ENCODING), typ))
def removeattr(LFSFilesystem fs, path, typ, filename_encoding=None):
filename_encoding = filename_encoding or FILENAME_ENCODING
_raise_on_error(lfs_removeattr(&fs._impl, path.encode(filename_encoding), typ))


def file_open(LFSFilesystem fs, path, flags):
def file_open(LFSFilesystem fs, path, flags, filename_encoding=None):
if isinstance(flags, str):
creating = False
reading = False
Expand Down Expand Up @@ -418,8 +429,9 @@ def file_open(LFSFilesystem fs, path, flags):
flags |= LFSFileFlag.rdwr

flags = int(flags)
filename_encoding = filename_encoding or FILENAME_ENCODING
fh = LFSFile()
_raise_on_error(lfs_file_open(&fs._impl, &fh._impl, path.encode(FILENAME_ENCODING), flags))
_raise_on_error(lfs_file_open(&fs._impl, &fh._impl, path.encode(filename_encoding), flags))
return fh


Expand All @@ -432,7 +444,7 @@ def file_close(LFSFilesystem fs, LFSFile fh):


def file_sync(LFSFilesystem fs, LFSFile fh):
_raise_on_error(lfs_file_sync(&fs._impl, &fh._impl))
return _raise_on_error(lfs_file_sync(&fs._impl, &fh._impl))


def file_read(LFSFilesystem fs, LFSFile fh, size):
Expand Down Expand Up @@ -470,24 +482,27 @@ def file_rewind(LFSFilesystem fs, LFSFile fh):
def file_size(LFSFilesystem fs, LFSFile fh):
return _raise_on_error(lfs_file_size(&fs._impl, &fh._impl))

def mkdir(LFSFilesystem fs, path):
return _raise_on_error(lfs_mkdir(&fs._impl, path.encode(FILENAME_ENCODING)))
def mkdir(LFSFilesystem fs, path, filename_encoding=None):
filename_encoding = filename_encoding or FILENAME_ENCODING
return _raise_on_error(lfs_mkdir(&fs._impl, path.encode(filename_encoding)))

def dir_open(LFSFilesystem fs, path):
def dir_open(LFSFilesystem fs, path, filename_encoding=None):
filename_encoding = filename_encoding or FILENAME_ENCODING
handle = LFSDirectory()
_raise_on_error(lfs_dir_open(&fs._impl, &handle._impl, path.encode(FILENAME_ENCODING)))
_raise_on_error(lfs_dir_open(&fs._impl, &handle._impl, path.encode(filename_encoding)))
return handle

def dir_close(LFSFilesystem fs, LFSDirectory dh):
return _raise_on_error(lfs_dir_close(&fs._impl, &dh._impl))

def dir_read(LFSFilesystem fs, LFSDirectory dh):
def dir_read(LFSFilesystem fs, LFSDirectory dh, filename_encoding=None):
filename_encoding = filename_encoding or FILENAME_ENCODING
cdef lfs_info * info = <lfs_info *>malloc(sizeof(lfs_info))
try:
retval = _raise_on_error(lfs_dir_read(&fs._impl, &dh._impl, info))
if retval == 0:
return None
return LFSStat(info.type, info.size, info.name.decode(FILENAME_ENCODING))
return LFSStat(info.type, info.size, info.name.decode(filename_encoding))
finally:
free(info)

Expand Down
47 changes: 47 additions & 0 deletions test/cli/test_create_and_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,53 @@
from littlefs.__main__ import main


def test_filename_encoding_roundtrip(tmp_path, capsys):
"""Create an image with a non-UTF-8 filename encoding and list it back.

"ÿ" is 0xFF in latin-1 but a 2-byte sequence in UTF-8, so the chosen
encoding must be honored on both the create (encode) and list (decode) side.
"""
source_dir = tmp_path / "source"
source_dir.mkdir()
name = "naïveÿ.txt"
(source_dir / name).write_text("hello")

image_file = tmp_path / "image.bin"
assert (
main(
[
"littlefs", "create", str(source_dir), str(image_file),
"--block-size", "512", "--fs-size", "64KB",
"--filename-encoding", "latin-1",
]
)
== 0
)

# Listing with the matching encoding round-trips the name.
assert (
main(
[
"littlefs", "list", str(image_file),
"--block-size", "512",
"--filename-encoding", "latin-1",
]
)
== 0
)
assert name in capsys.readouterr().out

# The on-disk name byte is 0xFF, which is invalid standalone UTF-8, so the
# default-encoding (utf-8) list fails loudly rather than silently mis-decoding.
with pytest.raises(UnicodeDecodeError):
main(
[
"littlefs", "list", str(image_file),
"--block-size", "512",
]
)


def test_create_and_extract(tmp_path):
"""Test creating a filesystem image and extracting it."""
# Create test directory with files
Expand Down
Loading
Loading